Java使用定时任务开始-竣事活动

打印 上一主题 下一主题

主题 504|帖子 504|积分 1512

        有一种场景,好比抢购活动,活动的开始肯定不是到点后手动点击开始按钮的,而是通过配置活动开始时间和竣事时间,然后开启定时任务,通过定时任务来控制活动的开始和竣事。

1. 活动上架--触发定时任务的开启

  1. @Override
  2. @Transactional(rollbackFor = Exception.class)
  3. public void publishIntegralActivity(List<Long> idList) {
  4.     // 修改上架状态
  5.     String accountName = SessionUtils.getAccountNameSession();
  6.     integralActivityDao.updatePublishStatus(idList, PublishStatusEnum.PUBLISH_YES.getCode(), accountName);
  7.     // 活动开始定时任务
  8.     for (Long id : idList) {
  9.         IntegralActivity integralActivity = integralActivityDao.getById(id);
  10.         integralActivityCronService.addStartCron(id, integralActivity.getStartTime());
  11.     }
  12. }
复制代码
2. 开启活动开始定时任务

  1. public void addStartCron(long id, Date startTime) {
  2.     long millSecond = startTime.getTime() - System.currentTimeMillis();
  3.     ScheduledFuture<?> scheduledFuture = scheduleExecutor.schedule(new Runnable() {
  4.         @Override
  5.         public void run() {
  6.             log.debug("integral activity[id=" + id + "] start cron begin execute");
  7.             String key = RedisConstant.INTEGRAL_ACTIVITY_CRON + ":" + ACTIVITY_START_CRON_PREFX + id;
  8.             String value = UUID.randomUUID().toString().replace("-", "");
  9.             if (!getExecuteLock(key, value)) {
  10.                 return;
  11.             }
  12.             try {
  13.                 IntegralActivityQueryDto queryDto = new IntegralActivityQueryDto();
  14.                 queryDto.setId(id);
  15.                 List<IntegralActivity> activityList = integralActivityDao.getActivityList(queryDto);
  16.                 if (CollectionUtils.isEmpty(activityList) ||
  17.                         !ActivityStatusEnum.NOT_START.getCode().equals(activityList.get(0).getActivityStatus()) ||
  18.                         startTime.getTime() != activityList.get(0).getStartTime().getTime()) {
  19.                     log.warn("when execute start cron, integral activity[id=" + id + "] not exists or status not match");
  20.                     lockUtils.unLock(key, value);
  21.                     return;
  22.                 }
  23.                 // 更改活动状态
  24.                 integralActivityDao.updateActivityStatus(Lists.newArrayList(id), ActivityStatusEnum.IN_PROGRESS.getCode(), null, null, activityList.get(0).getEndTime());
  25.                 // 清除活动结束定时任务
  26.                 clearCronById(ACTIVITY_END_CRON_PREFX + id);
  27.                 // 添加活动结束定时任务
  28.                 addEndCron(id, activityList.get(0).getEndTime());
  29.                 lockUtils.unLock(key, value);
  30.             } catch (Exception e) {
  31.                 log.error("integral activity[id=" + id + "] start error, retry", e);
  32.                 lockUtils.unLock(key, value);
  33.                 addStartCron(id, startTime);
  34.             }
  35.         }
  36.     }, millSecond, TimeUnit.MILLISECONDS);
  37.     // 加入任务
  38.     currentCronMap.put(ACTIVITY_START_CRON_PREFX + id, scheduledFuture);
  39. }
复制代码
3. 开启定时任务时要加锁,防止并发哀求

  1. private boolean getExecuteLock(String key, String value) {
  2.     for (int i = 1; i <= 5; i++) {
  3.         try {
  4.             if (lockUtils.getLock(key, value, 10 * 60 * 1000)) {
  5.                 return true;
  6.             }
  7.         } catch (Exception e) {
  8.             log.warn("redis lock[key=" + key + "] get error, continue");
  9.         }
  10.     }
  11.     return false;
  12. }
复制代码
4. 开启活动竣事定时任务

  1. private void addEndCron(long id, Date endTime) {
  2.     long millSecond = endTime.getTime() - System.currentTimeMillis();
  3.     log.debug("integral activity[id=" + id + "] end cron begin execute");
  4.     ScheduledFuture<?> scheduledFuture = scheduleExecutor.schedule(new Runnable() {
  5.         @Override
  6.         public void run() {
  7.             try {
  8.                 IntegralActivityQueryDto queryDto = new IntegralActivityQueryDto();
  9.                 queryDto.setId(id);
  10.                 List<IntegralActivity> activityList = integralActivityDao.getActivityList(queryDto);
  11.                 if (CollectionUtils.isEmpty(activityList) ||
  12.                         !ActivityStatusEnum.IN_PROGRESS.getCode().equals(activityList.get(0).getActivityStatus()) ||
  13.                         endTime.getTime() != activityList.get(0).getEndTime().getTime()) {
  14.                     return;
  15.                 }
  16.                 // 更改活动状态
  17.                 integralActivityDao.updateActivityStatus(Lists.newArrayList(id), ActivityStatusEnum.FINISHED.getCode(), null, null, new Date());
  18.                 // 更改上架状态
  19.                 integralActivityDao.updatePublishStatus(Lists.newArrayList(id), PublishStatusEnum.PUBLISH_NO.getCode(), null);
  20.             } catch (Exception e) {
  21.                 log.error("integral activity[id=" + id + "] end error, retry", e);
  22.                 addEndCron(id, endTime);
  23.             }
  24.         }
  25.     }, millSecond, TimeUnit.MILLISECONDS);
  26.     // 加入任务
  27.     currentCronMap.put(ACTIVITY_END_CRON_PREFX + id, scheduledFuture);
  28. }
复制代码
5.  活动下架

  1. @Override
  2. @Transactional(rollbackFor = Exception.class)
  3. public void offIntegralActivity(List<Long> idList) {
  4.     String accountName = SessionUtils.getAccountNameSession();
  5.     integralActivityDao.updateActivityStatus(idList, ActivityStatusEnum.EARLY_STOP.getCode(), null, accountName, new Date());
  6.     integralActivityDao.updatePublishStatus(idList, PublishStatusEnum.PUBLISH_NO.getCode(), accountName);
  7.     // 活动结束定时任务
  8.     for (Long id : idList) {
  9.         integralActivityCronService.clearActivityCron(id);
  10.     }
  11. }
复制代码
6. 活动下架后要清除定时任务

  1. public void clearActivityCron(long id) {
  2.     this.clearCronById(ACTIVITY_START_CRON_PREFX + id);
  3.     this.clearCronById(ACTIVITY_END_CRON_PREFX + id);
  4. }
  5. @SuppressWarnings("rawtypes")
  6. private void clearCronById(String cronId) {
  7.     if (currentCronMap.get(cronId) == null) {
  8.         log.debug("cron[id=" + cronId + "] not exists");
  9.     }
  10.     Future future = currentCronMap.get(cronId);
  11.     if (null != future && !future.isDone()) {
  12.         future.cancel(true); // 正在运行是否干扰
  13.     }
  14.     currentCronMap.remove(cronId);
  15. }
复制代码
7. 程序启动时开启全部活动的定时任务

  1. @Component
  2. @Slf4j
  3. public class InitIntegralActivityCron implements CommandLineRunner {
  4.    
  5.     @Autowired
  6.     private LockUtils lockUtils;
  7.     @Autowired
  8.     private IntegralActivityCronService integralActivityCronService;
  9.     @Override
  10.     public void run(String... args) throws Exception {
  11.         // 初始化积分活动定时任务
  12.         String key = RedisConstant.LOAD_INTEGRAL_ACTIVITY_CRON;
  13.         String value = UUID.randomUUID().toString().replace("-", "");
  14.         if (!lockUtils.getLock(key, value, 30 * 60 * 1000)) {
  15.             log.info("integral activity cron load lock get failed, return!!!");
  16.             return;
  17.         }
  18.         try {
  19.             if (!integralActivityCronService.createAllActivityCron()) {
  20.                 lockUtils.unLock(key, value);
  21.                 log.error("partial integral activity cron load error, system will exit");
  22.                 System.exit(0);
  23.             }
  24.         } catch (Exception e) {
  25.             log.error("integral activity cron load error, system will exit", e);
  26.             lockUtils.unLock(key, value);
  27.             System.exit(0);
  28.         }
  29.     }
  30. }
复制代码
  1. @Override
  2. public boolean createAllActivityCron() {
  3.     boolean isAllSuccess = true;// 是否都创建成功
  4.     List<String> activityStatusList = new ArrayList<String>();
  5.     activityStatusList.add(ActivityStatusEnum.NOT_START.getCode());
  6.     activityStatusList.add(ActivityStatusEnum.IN_PROGRESS.getCode());
  7.     IntegralActivityQueryDto queryDto = new IntegralActivityQueryDto();
  8.     queryDto.setActivityStatusList(activityStatusList);
  9.     queryDto.setPublishStatusList(Lists.newArrayList(PublishStatusEnum.PUBLISH_YES.getCode()));
  10.     List<IntegralActivity> activityList = integralActivityDao.getActivityList(queryDto);
  11.     if (CollectionUtils.isNotEmpty(activityList)) {
  12.         for (IntegralActivity integralActivity : activityList) {
  13.             long id = integralActivity.getId();
  14.             try {
  15.                 if (ActivityStatusEnum.NOT_START.getCode().equals(integralActivity.getActivityStatus())) {
  16.                     this.addStartCron(id, integralActivity.getStartTime());
  17.                 } else {
  18.                     this.addEndCron(id, integralActivity.getEndTime());
  19.                 }
  20.             } catch (Exception e) {
  21.                 isAllSuccess = false;
  22.                 log.error("integral activity[id=" + id + "] cron create error", e);
  23.             }
  24.         }
  25.     }
  26.     log.info("create integral activity cron end, activity count =" + activityList.size() + " " + (isAllSuccess ? "all success" : "partial success"));
  27.     return isAllSuccess;
  28. }
复制代码

上面是定时任务的完整流程,下面补充完整代码:
1. 定时任务完整代码

  1. @Service
  2. @Slf4j
  3. public class IntegralActivityCronServiceImpl implements IntegralActivityCronService {
  4.    
  5.     // 执行定时对象池
  6.     private ScheduledExecutorService scheduleExecutor = Executors.newScheduledThreadPool(2);
  7.     // 所有的定时任务
  8.     @SuppressWarnings("rawtypes")
  9.     private static Map<String, Future> currentCronMap = new HashMap<String, Future>();
  10.     /**
  11.      * 活动开始定时任务ID前缀
  12.      */
  13.     private static final String ACTIVITY_START_CRON_PREFX = "ACTIVITY_START:";
  14.     /**
  15.      * 活动结束定时任务ID前缀
  16.      */
  17.     private static final String ACTIVITY_END_CRON_PREFX = "ACTIVITY_END:";
  18.     @Autowired
  19.     private IntegralActivityDao integralActivityDao;
  20.     @Autowired
  21.     private LockUtils lockUtils;
  22.     @Override
  23.     public boolean createAllActivityCron() {
  24.         boolean isAllSuccess = true;// 是否都创建成功
  25.         List<String> activityStatusList = new ArrayList<String>();
  26.         activityStatusList.add(ActivityStatusEnum.NOT_START.getCode());
  27.         activityStatusList.add(ActivityStatusEnum.IN_PROGRESS.getCode());
  28.         IntegralActivityQueryDto queryDto = new IntegralActivityQueryDto();
  29.         queryDto.setActivityStatusList(activityStatusList);
  30.         queryDto.setPublishStatusList(Lists.newArrayList(PublishStatusEnum.PUBLISH_YES.getCode()));
  31.         List<IntegralActivity> activityList = integralActivityDao.getActivityList(queryDto);
  32.         if (CollectionUtils.isNotEmpty(activityList)) {
  33.             for (IntegralActivity integralActivity : activityList) {
  34.                 long id = integralActivity.getId();
  35.                 try {
  36.                     if (ActivityStatusEnum.NOT_START.getCode().equals(integralActivity.getActivityStatus())) {
  37.                         this.addStartCron(id, integralActivity.getStartTime());
  38.                     } else {
  39.                         this.addEndCron(id, integralActivity.getEndTime());
  40.                     }
  41.                 } catch (Exception e) {
  42.                     isAllSuccess = false;
  43.                     log.error("integral activity[id=" + id + "] cron create error", e);
  44.                 }
  45.             }
  46.         }
  47.         log.info("create integral activity cron end, activity count =" + activityList.size() + " " + (isAllSuccess ? "all success" : "partial success"));
  48.         return isAllSuccess;
  49.     }
  50.     /**
  51.      * 添加启动定时任务
  52.      *
  53.      * @param id
  54.      * @param startTime
  55.      */
  56.     public void addStartCron(long id, Date startTime) {
  57.         long millSecond = startTime.getTime() - System.currentTimeMillis();
  58.         ScheduledFuture<?> scheduledFuture = scheduleExecutor.schedule(new Runnable() {
  59.             @Override
  60.             public void run() {
  61.                 log.debug("integral activity[id=" + id + "] start cron begin execute");
  62.                 String key = RedisConstant.INTEGRAL_ACTIVITY_CRON + ":" + ACTIVITY_START_CRON_PREFX + id;
  63.                 String value = UUID.randomUUID().toString().replace("-", "");
  64.                 if (!getExecuteLock(key, value)) {
  65.                     return;
  66.                 }
  67.                 try {
  68.                     IntegralActivityQueryDto queryDto = new IntegralActivityQueryDto();
  69.                     queryDto.setId(id);
  70.                     List<IntegralActivity> activityList = integralActivityDao.getActivityList(queryDto);
  71.                     if (CollectionUtils.isEmpty(activityList) ||
  72.                             !ActivityStatusEnum.NOT_START.getCode().equals(activityList.get(0).getActivityStatus()) ||
  73.                             startTime.getTime() != activityList.get(0).getStartTime().getTime()) {
  74.                         log.warn("when execute start cron, integral activity[id=" + id + "] not exists or status not match");
  75.                         lockUtils.unLock(key, value);
  76.                         return;
  77.                     }
  78.                     // 更改活动状态
  79.                     integralActivityDao.updateActivityStatus(Lists.newArrayList(id), ActivityStatusEnum.IN_PROGRESS.getCode(), null, null, activityList.get(0).getEndTime());
  80.                     // 清除活动结束定时任务
  81.                     clearCronById(ACTIVITY_END_CRON_PREFX + id);
  82.                     // 添加活动结束定时任务
  83.                     addEndCron(id, activityList.get(0).getEndTime());
  84.                     lockUtils.unLock(key, value);
  85.                 } catch (Exception e) {
  86.                     log.error("integral activity[id=" + id + "] start error, retry", e);
  87.                     lockUtils.unLock(key, value);
  88.                     addStartCron(id, startTime);
  89.                 }
  90.             }
  91.         }, millSecond, TimeUnit.MILLISECONDS);
  92.         // 加入任务
  93.         currentCronMap.put(ACTIVITY_START_CRON_PREFX + id, scheduledFuture);
  94.     }
  95.     /**
  96.      * 添加结束定时任务
  97.      *
  98.      * @param id
  99.      * @param endTime
  100.      */
  101.     private void addEndCron(long id, Date endTime) {
  102.         long millSecond = endTime.getTime() - System.currentTimeMillis();
  103.         log.debug("integral activity[id=" + id + "] end cron begin execute");
  104.         ScheduledFuture<?> scheduledFuture = scheduleExecutor.schedule(new Runnable() {
  105.             @Override
  106.             public void run() {
  107.                 try {
  108.                     IntegralActivityQueryDto queryDto = new IntegralActivityQueryDto();
  109.                     queryDto.setId(id);
  110.                     List<IntegralActivity> activityList = integralActivityDao.getActivityList(queryDto);
  111.                     if (CollectionUtils.isEmpty(activityList) ||
  112.                             !ActivityStatusEnum.IN_PROGRESS.getCode().equals(activityList.get(0).getActivityStatus()) ||
  113.                             endTime.getTime() != activityList.get(0).getEndTime().getTime()) {
  114.                         return;
  115.                     }
  116.                     // 更改活动状态
  117.                     integralActivityDao.updateActivityStatus(Lists.newArrayList(id), ActivityStatusEnum.FINISHED.getCode(), null, null, new Date());
  118.                     // 更改上架状态
  119.                     integralActivityDao.updatePublishStatus(Lists.newArrayList(id), PublishStatusEnum.PUBLISH_NO.getCode(), null);
  120.                 } catch (Exception e) {
  121.                     log.error("integral activity[id=" + id + "] end error, retry", e);
  122.                     addEndCron(id, endTime);
  123.                 }
  124.             }
  125.         }, millSecond, TimeUnit.MILLISECONDS);
  126.         // 加入任务
  127.         currentCronMap.put(ACTIVITY_END_CRON_PREFX + id, scheduledFuture);
  128.     }
  129.     /**
  130.      * 清除活动定时任务
  131.      *
  132.      * @param id
  133.      */
  134.     public void clearActivityCron(long id) {
  135.         this.clearCronById(ACTIVITY_START_CRON_PREFX + id);
  136.         this.clearCronById(ACTIVITY_END_CRON_PREFX + id);
  137.     }
  138.     /**
  139.      * 通过id清除任务
  140.      *
  141.      * @param cronId
  142.      */
  143.     @SuppressWarnings("rawtypes")
  144.     private void clearCronById(String cronId) {
  145.         if (currentCronMap.get(cronId) == null) {
  146.             log.debug("cron[id=" + cronId + "] not exists");
  147.         }
  148.         Future future = currentCronMap.get(cronId);
  149.         if (null != future && !future.isDone()) {
  150.             future.cancel(true); // 正在运行是否干扰
  151.         }
  152.         currentCronMap.remove(cronId);
  153.     }
  154.    
  155.     /**
  156.      * 获取活动执行锁
  157.      *
  158.      * @param key
  159.      * @param value
  160.      * @return
  161.      */
  162.     private boolean getExecuteLock(String key, String value) {
  163.         for (int i = 1; i <= 5; i++) {
  164.             try {
  165.                 if (lockUtils.getLock(key, value, 10 * 60 * 1000)) {
  166.                     return true;
  167.                 }
  168.             } catch (Exception e) {
  169.                 log.warn("redis lock[key=" + key + "] get error, continue");
  170.             }
  171.         }
  172.         return false;
  173.     }
  174. }
复制代码
2. Redis工具类

  1. @Component()
  2. public class LockUtils {
  3.     @Resource
  4.     private RedisTemplate<String, Object> redisTemplate;
  5.     public LockUtils() {
  6.     }
  7.     public boolean getLock(String key) {
  8.         return this.redisTemplate.opsForValue().setIfAbsent("lock_key_" + key, "lock_default_value", Duration.ofSeconds(60L));
  9.     }
  10.     public boolean getLock(String key, String value) {
  11.         return this.redisTemplate.opsForValue().setIfAbsent("lock_key_" + key, value, Duration.ofSeconds(60L));
  12.     }
  13.     public boolean unLock(String key) {
  14.         return this.redisTemplate.delete("lock_key_" + key);
  15.     }
  16.     public boolean unLock(String key, String value) {
  17.         if (value == null) {
  18.             return this.unLock(key);
  19.         } else {
  20.             return value.equals(this.redisTemplate.opsForValue().get("lock_key_" + key)) ? this.redisTemplate.delete("lock_key_" + key) : false;
  21.         }
  22.     }
  23.     public boolean getLock(String key, long expireTime) {
  24.         return this.redisTemplate.opsForValue().setIfAbsent("lock_key_" + key, "lock_default_value", expireTime, TimeUnit.MILLISECONDS);
  25.     }
  26.     public boolean getLock(String key, String value, long expireTime) {
  27.         return this.redisTemplate.opsForValue().setIfAbsent("lock_key_" + key, value, expireTime, TimeUnit.MILLISECONDS);
  28.     }
  29. }
复制代码
3. 活动状态枚举类

  1. public enum ActivityStatusEnum {
  2.     NOT_START("1", "未开始"),
  3.     IN_PROGRESS("2", "进行中"),
  4.     FINISHED("3", "已结束"),
  5.     EARLY_STOP("4", "提前结束");
  6.    
  7.     private String code;
  8.     private String value;
  9.     private ActivityStatusEnum(String code, String value) {
  10.         this.code = code;
  11.         this.value = value;
  12.     }
  13.     public String getCode() {
  14.         return code;
  15.     }
  16.     public void setCode(String code) {
  17.         this.code = code;
  18.     }
  19.     public String getValue() {
  20.         return value;
  21.     }
  22.     public void setValue(String value) {
  23.         this.value = value;
  24.     }
  25.     /**
  26.      * 根据code获取value
  27.      * @param code
  28.      * @return
  29.      */
  30.     public static String getValueByCode(String code) {
  31.         if (StringUtils.isEmpty(code)) {
  32.             return "";
  33.         }
  34.         for (ActivityStatusEnum item : ActivityStatusEnum.values()) {
  35.             if (item.getCode().equals(code)) {
  36.                 return item.getValue();
  37.             }
  38.         }
  39.         return "";
  40.     }
  41.    
  42.     /**
  43.      * 根据code获取value
  44.      * @param code
  45.      * @return
  46.      */
  47.     public static String getCodeByValue(String value) {
  48.         if (StringUtils.isEmpty(value)) {
  49.             return "";
  50.         }
  51.         for (ActivityStatusEnum item : ActivityStatusEnum.values()) {
  52.             if (item.getValue().equals(value)) {
  53.                 return item.getCode();
  54.             }
  55.         }
  56.         return "";
  57.     }
  58. }
复制代码
其它业务代码没有参考价值不再贴出。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

写过一篇

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表