有一种场景,好比抢购活动,活动的开始肯定不是到点后手动点击开始按钮的,而是通过配置活动开始时间和竣事时间,然后开启定时任务,通过定时任务来控制活动的开始和竣事。
1. 活动上架--触发定时任务的开启
- @Override
- @Transactional(rollbackFor = Exception.class)
- public void publishIntegralActivity(List<Long> idList) {
- // 修改上架状态
- String accountName = SessionUtils.getAccountNameSession();
- integralActivityDao.updatePublishStatus(idList, PublishStatusEnum.PUBLISH_YES.getCode(), accountName);
- // 活动开始定时任务
- for (Long id : idList) {
- IntegralActivity integralActivity = integralActivityDao.getById(id);
- integralActivityCronService.addStartCron(id, integralActivity.getStartTime());
- }
- }
复制代码 2. 开启活动开始定时任务
- public void addStartCron(long id, Date startTime) {
- long millSecond = startTime.getTime() - System.currentTimeMillis();
- ScheduledFuture<?> scheduledFuture = scheduleExecutor.schedule(new Runnable() {
- @Override
- public void run() {
- log.debug("integral activity[id=" + id + "] start cron begin execute");
- String key = RedisConstant.INTEGRAL_ACTIVITY_CRON + ":" + ACTIVITY_START_CRON_PREFX + id;
- String value = UUID.randomUUID().toString().replace("-", "");
- if (!getExecuteLock(key, value)) {
- return;
- }
- try {
- IntegralActivityQueryDto queryDto = new IntegralActivityQueryDto();
- queryDto.setId(id);
- List<IntegralActivity> activityList = integralActivityDao.getActivityList(queryDto);
- if (CollectionUtils.isEmpty(activityList) ||
- !ActivityStatusEnum.NOT_START.getCode().equals(activityList.get(0).getActivityStatus()) ||
- startTime.getTime() != activityList.get(0).getStartTime().getTime()) {
- log.warn("when execute start cron, integral activity[id=" + id + "] not exists or status not match");
- lockUtils.unLock(key, value);
- return;
- }
- // 更改活动状态
- integralActivityDao.updateActivityStatus(Lists.newArrayList(id), ActivityStatusEnum.IN_PROGRESS.getCode(), null, null, activityList.get(0).getEndTime());
- // 清除活动结束定时任务
- clearCronById(ACTIVITY_END_CRON_PREFX + id);
- // 添加活动结束定时任务
- addEndCron(id, activityList.get(0).getEndTime());
- lockUtils.unLock(key, value);
- } catch (Exception e) {
- log.error("integral activity[id=" + id + "] start error, retry", e);
- lockUtils.unLock(key, value);
- addStartCron(id, startTime);
- }
- }
- }, millSecond, TimeUnit.MILLISECONDS);
- // 加入任务
- currentCronMap.put(ACTIVITY_START_CRON_PREFX + id, scheduledFuture);
- }
复制代码 3. 开启定时任务时要加锁,防止并发哀求
- private boolean getExecuteLock(String key, String value) {
- for (int i = 1; i <= 5; i++) {
- try {
- if (lockUtils.getLock(key, value, 10 * 60 * 1000)) {
- return true;
- }
- } catch (Exception e) {
- log.warn("redis lock[key=" + key + "] get error, continue");
- }
- }
- return false;
- }
复制代码 4. 开启活动竣事定时任务
- private void addEndCron(long id, Date endTime) {
- long millSecond = endTime.getTime() - System.currentTimeMillis();
- log.debug("integral activity[id=" + id + "] end cron begin execute");
- ScheduledFuture<?> scheduledFuture = scheduleExecutor.schedule(new Runnable() {
- @Override
- public void run() {
- try {
- IntegralActivityQueryDto queryDto = new IntegralActivityQueryDto();
- queryDto.setId(id);
- List<IntegralActivity> activityList = integralActivityDao.getActivityList(queryDto);
- if (CollectionUtils.isEmpty(activityList) ||
- !ActivityStatusEnum.IN_PROGRESS.getCode().equals(activityList.get(0).getActivityStatus()) ||
- endTime.getTime() != activityList.get(0).getEndTime().getTime()) {
- return;
- }
- // 更改活动状态
- integralActivityDao.updateActivityStatus(Lists.newArrayList(id), ActivityStatusEnum.FINISHED.getCode(), null, null, new Date());
- // 更改上架状态
- integralActivityDao.updatePublishStatus(Lists.newArrayList(id), PublishStatusEnum.PUBLISH_NO.getCode(), null);
- } catch (Exception e) {
- log.error("integral activity[id=" + id + "] end error, retry", e);
- addEndCron(id, endTime);
- }
- }
- }, millSecond, TimeUnit.MILLISECONDS);
- // 加入任务
- currentCronMap.put(ACTIVITY_END_CRON_PREFX + id, scheduledFuture);
- }
复制代码 5. 活动下架
- @Override
- @Transactional(rollbackFor = Exception.class)
- public void offIntegralActivity(List<Long> idList) {
- String accountName = SessionUtils.getAccountNameSession();
- integralActivityDao.updateActivityStatus(idList, ActivityStatusEnum.EARLY_STOP.getCode(), null, accountName, new Date());
- integralActivityDao.updatePublishStatus(idList, PublishStatusEnum.PUBLISH_NO.getCode(), accountName);
- // 活动结束定时任务
- for (Long id : idList) {
- integralActivityCronService.clearActivityCron(id);
- }
- }
复制代码 6. 活动下架后要清除定时任务
- public void clearActivityCron(long id) {
- this.clearCronById(ACTIVITY_START_CRON_PREFX + id);
- this.clearCronById(ACTIVITY_END_CRON_PREFX + id);
- }
- @SuppressWarnings("rawtypes")
- private void clearCronById(String cronId) {
- if (currentCronMap.get(cronId) == null) {
- log.debug("cron[id=" + cronId + "] not exists");
- }
- Future future = currentCronMap.get(cronId);
- if (null != future && !future.isDone()) {
- future.cancel(true); // 正在运行是否干扰
- }
- currentCronMap.remove(cronId);
- }
复制代码 7. 程序启动时开启全部活动的定时任务
- @Component
- @Slf4j
- public class InitIntegralActivityCron implements CommandLineRunner {
-
- @Autowired
- private LockUtils lockUtils;
- @Autowired
- private IntegralActivityCronService integralActivityCronService;
- @Override
- public void run(String... args) throws Exception {
- // 初始化积分活动定时任务
- String key = RedisConstant.LOAD_INTEGRAL_ACTIVITY_CRON;
- String value = UUID.randomUUID().toString().replace("-", "");
- if (!lockUtils.getLock(key, value, 30 * 60 * 1000)) {
- log.info("integral activity cron load lock get failed, return!!!");
- return;
- }
- try {
- if (!integralActivityCronService.createAllActivityCron()) {
- lockUtils.unLock(key, value);
- log.error("partial integral activity cron load error, system will exit");
- System.exit(0);
- }
- } catch (Exception e) {
- log.error("integral activity cron load error, system will exit", e);
- lockUtils.unLock(key, value);
- System.exit(0);
- }
- }
- }
复制代码- @Override
- public boolean createAllActivityCron() {
- boolean isAllSuccess = true;// 是否都创建成功
- List<String> activityStatusList = new ArrayList<String>();
- activityStatusList.add(ActivityStatusEnum.NOT_START.getCode());
- activityStatusList.add(ActivityStatusEnum.IN_PROGRESS.getCode());
- IntegralActivityQueryDto queryDto = new IntegralActivityQueryDto();
- queryDto.setActivityStatusList(activityStatusList);
- queryDto.setPublishStatusList(Lists.newArrayList(PublishStatusEnum.PUBLISH_YES.getCode()));
- List<IntegralActivity> activityList = integralActivityDao.getActivityList(queryDto);
- if (CollectionUtils.isNotEmpty(activityList)) {
- for (IntegralActivity integralActivity : activityList) {
- long id = integralActivity.getId();
- try {
- if (ActivityStatusEnum.NOT_START.getCode().equals(integralActivity.getActivityStatus())) {
- this.addStartCron(id, integralActivity.getStartTime());
- } else {
- this.addEndCron(id, integralActivity.getEndTime());
- }
- } catch (Exception e) {
- isAllSuccess = false;
- log.error("integral activity[id=" + id + "] cron create error", e);
- }
- }
- }
- log.info("create integral activity cron end, activity count =" + activityList.size() + " " + (isAllSuccess ? "all success" : "partial success"));
- return isAllSuccess;
- }
复制代码
上面是定时任务的完整流程,下面补充完整代码:
1. 定时任务完整代码
- @Service
- @Slf4j
- public class IntegralActivityCronServiceImpl implements IntegralActivityCronService {
-
- // 执行定时对象池
- private ScheduledExecutorService scheduleExecutor = Executors.newScheduledThreadPool(2);
- // 所有的定时任务
- @SuppressWarnings("rawtypes")
- private static Map<String, Future> currentCronMap = new HashMap<String, Future>();
- /**
- * 活动开始定时任务ID前缀
- */
- private static final String ACTIVITY_START_CRON_PREFX = "ACTIVITY_START:";
- /**
- * 活动结束定时任务ID前缀
- */
- private static final String ACTIVITY_END_CRON_PREFX = "ACTIVITY_END:";
- @Autowired
- private IntegralActivityDao integralActivityDao;
- @Autowired
- private LockUtils lockUtils;
- @Override
- public boolean createAllActivityCron() {
- boolean isAllSuccess = true;// 是否都创建成功
- List<String> activityStatusList = new ArrayList<String>();
- activityStatusList.add(ActivityStatusEnum.NOT_START.getCode());
- activityStatusList.add(ActivityStatusEnum.IN_PROGRESS.getCode());
- IntegralActivityQueryDto queryDto = new IntegralActivityQueryDto();
- queryDto.setActivityStatusList(activityStatusList);
- queryDto.setPublishStatusList(Lists.newArrayList(PublishStatusEnum.PUBLISH_YES.getCode()));
- List<IntegralActivity> activityList = integralActivityDao.getActivityList(queryDto);
- if (CollectionUtils.isNotEmpty(activityList)) {
- for (IntegralActivity integralActivity : activityList) {
- long id = integralActivity.getId();
- try {
- if (ActivityStatusEnum.NOT_START.getCode().equals(integralActivity.getActivityStatus())) {
- this.addStartCron(id, integralActivity.getStartTime());
- } else {
- this.addEndCron(id, integralActivity.getEndTime());
- }
- } catch (Exception e) {
- isAllSuccess = false;
- log.error("integral activity[id=" + id + "] cron create error", e);
- }
- }
- }
- log.info("create integral activity cron end, activity count =" + activityList.size() + " " + (isAllSuccess ? "all success" : "partial success"));
- return isAllSuccess;
- }
- /**
- * 添加启动定时任务
- *
- * @param id
- * @param startTime
- */
- public void addStartCron(long id, Date startTime) {
- long millSecond = startTime.getTime() - System.currentTimeMillis();
- ScheduledFuture<?> scheduledFuture = scheduleExecutor.schedule(new Runnable() {
- @Override
- public void run() {
- log.debug("integral activity[id=" + id + "] start cron begin execute");
- String key = RedisConstant.INTEGRAL_ACTIVITY_CRON + ":" + ACTIVITY_START_CRON_PREFX + id;
- String value = UUID.randomUUID().toString().replace("-", "");
- if (!getExecuteLock(key, value)) {
- return;
- }
- try {
- IntegralActivityQueryDto queryDto = new IntegralActivityQueryDto();
- queryDto.setId(id);
- List<IntegralActivity> activityList = integralActivityDao.getActivityList(queryDto);
- if (CollectionUtils.isEmpty(activityList) ||
- !ActivityStatusEnum.NOT_START.getCode().equals(activityList.get(0).getActivityStatus()) ||
- startTime.getTime() != activityList.get(0).getStartTime().getTime()) {
- log.warn("when execute start cron, integral activity[id=" + id + "] not exists or status not match");
- lockUtils.unLock(key, value);
- return;
- }
- // 更改活动状态
- integralActivityDao.updateActivityStatus(Lists.newArrayList(id), ActivityStatusEnum.IN_PROGRESS.getCode(), null, null, activityList.get(0).getEndTime());
- // 清除活动结束定时任务
- clearCronById(ACTIVITY_END_CRON_PREFX + id);
- // 添加活动结束定时任务
- addEndCron(id, activityList.get(0).getEndTime());
- lockUtils.unLock(key, value);
- } catch (Exception e) {
- log.error("integral activity[id=" + id + "] start error, retry", e);
- lockUtils.unLock(key, value);
- addStartCron(id, startTime);
- }
- }
- }, millSecond, TimeUnit.MILLISECONDS);
- // 加入任务
- currentCronMap.put(ACTIVITY_START_CRON_PREFX + id, scheduledFuture);
- }
- /**
- * 添加结束定时任务
- *
- * @param id
- * @param endTime
- */
- private void addEndCron(long id, Date endTime) {
- long millSecond = endTime.getTime() - System.currentTimeMillis();
- log.debug("integral activity[id=" + id + "] end cron begin execute");
- ScheduledFuture<?> scheduledFuture = scheduleExecutor.schedule(new Runnable() {
- @Override
- public void run() {
- try {
- IntegralActivityQueryDto queryDto = new IntegralActivityQueryDto();
- queryDto.setId(id);
- List<IntegralActivity> activityList = integralActivityDao.getActivityList(queryDto);
- if (CollectionUtils.isEmpty(activityList) ||
- !ActivityStatusEnum.IN_PROGRESS.getCode().equals(activityList.get(0).getActivityStatus()) ||
- endTime.getTime() != activityList.get(0).getEndTime().getTime()) {
- return;
- }
- // 更改活动状态
- integralActivityDao.updateActivityStatus(Lists.newArrayList(id), ActivityStatusEnum.FINISHED.getCode(), null, null, new Date());
- // 更改上架状态
- integralActivityDao.updatePublishStatus(Lists.newArrayList(id), PublishStatusEnum.PUBLISH_NO.getCode(), null);
- } catch (Exception e) {
- log.error("integral activity[id=" + id + "] end error, retry", e);
- addEndCron(id, endTime);
- }
- }
- }, millSecond, TimeUnit.MILLISECONDS);
- // 加入任务
- currentCronMap.put(ACTIVITY_END_CRON_PREFX + id, scheduledFuture);
- }
- /**
- * 清除活动定时任务
- *
- * @param id
- */
- public void clearActivityCron(long id) {
- this.clearCronById(ACTIVITY_START_CRON_PREFX + id);
- this.clearCronById(ACTIVITY_END_CRON_PREFX + id);
- }
- /**
- * 通过id清除任务
- *
- * @param cronId
- */
- @SuppressWarnings("rawtypes")
- private void clearCronById(String cronId) {
- if (currentCronMap.get(cronId) == null) {
- log.debug("cron[id=" + cronId + "] not exists");
- }
- Future future = currentCronMap.get(cronId);
- if (null != future && !future.isDone()) {
- future.cancel(true); // 正在运行是否干扰
- }
- currentCronMap.remove(cronId);
- }
-
- /**
- * 获取活动执行锁
- *
- * @param key
- * @param value
- * @return
- */
- private boolean getExecuteLock(String key, String value) {
- for (int i = 1; i <= 5; i++) {
- try {
- if (lockUtils.getLock(key, value, 10 * 60 * 1000)) {
- return true;
- }
- } catch (Exception e) {
- log.warn("redis lock[key=" + key + "] get error, continue");
- }
- }
- return false;
- }
- }
复制代码 2. Redis工具类
- @Component()
- public class LockUtils {
- @Resource
- private RedisTemplate<String, Object> redisTemplate;
- public LockUtils() {
- }
- public boolean getLock(String key) {
- return this.redisTemplate.opsForValue().setIfAbsent("lock_key_" + key, "lock_default_value", Duration.ofSeconds(60L));
- }
- public boolean getLock(String key, String value) {
- return this.redisTemplate.opsForValue().setIfAbsent("lock_key_" + key, value, Duration.ofSeconds(60L));
- }
- public boolean unLock(String key) {
- return this.redisTemplate.delete("lock_key_" + key);
- }
- public boolean unLock(String key, String value) {
- if (value == null) {
- return this.unLock(key);
- } else {
- return value.equals(this.redisTemplate.opsForValue().get("lock_key_" + key)) ? this.redisTemplate.delete("lock_key_" + key) : false;
- }
- }
- public boolean getLock(String key, long expireTime) {
- return this.redisTemplate.opsForValue().setIfAbsent("lock_key_" + key, "lock_default_value", expireTime, TimeUnit.MILLISECONDS);
- }
- public boolean getLock(String key, String value, long expireTime) {
- return this.redisTemplate.opsForValue().setIfAbsent("lock_key_" + key, value, expireTime, TimeUnit.MILLISECONDS);
- }
- }
复制代码 3. 活动状态枚举类
- public enum ActivityStatusEnum {
- NOT_START("1", "未开始"),
- IN_PROGRESS("2", "进行中"),
- FINISHED("3", "已结束"),
- EARLY_STOP("4", "提前结束");
-
- private String code;
- private String value;
- private ActivityStatusEnum(String code, String value) {
- this.code = code;
- this.value = value;
- }
- public String getCode() {
- return code;
- }
- public void setCode(String code) {
- this.code = code;
- }
- public String getValue() {
- return value;
- }
- public void setValue(String value) {
- this.value = value;
- }
- /**
- * 根据code获取value
- * @param code
- * @return
- */
- public static String getValueByCode(String code) {
- if (StringUtils.isEmpty(code)) {
- return "";
- }
- for (ActivityStatusEnum item : ActivityStatusEnum.values()) {
- if (item.getCode().equals(code)) {
- return item.getValue();
- }
- }
- return "";
- }
-
- /**
- * 根据code获取value
- * @param code
- * @return
- */
- public static String getCodeByValue(String value) {
- if (StringUtils.isEmpty(value)) {
- return "";
- }
- for (ActivityStatusEnum item : ActivityStatusEnum.values()) {
- if (item.getValue().equals(value)) {
- return item.getCode();
- }
- }
- return "";
- }
- }
复制代码 其它业务代码没有参考价值不再贴出。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |