Spring Boot异步请求处理框架

打印 上一主题 下一主题

主题 877|帖子 877|积分 2631

Spring Boot异步请求处理框架

1、前言
  1. ​        在Spring Boot项目中,经常会遇到处理时间过长,导致出现HTTP请求超时问题,状态码:502。
  2. ​        例如一个文件导入接口需要导入一个Excel文件的学员记录,原来是针对一个班的学员,最多500条记录,1分钟的HTTP超时时长之内基本可以响应。现在将很多班级的学员混在一起,放在一个Excel文件中(这样可以提高操作人员的工作效率),比如5万条学员记录,于是就出现HTTP请求超时问题。
  3. ​        ​解决方案有:1)Ajax异步请求方式;2)WebSocket方式;3)异步请求处理方式:请求+轮询。
  4. ​        方案1,需要调整HTTP超时设置,Spring Boot开启异步处理(使用@EnableAsync和@Async),这种方式,问题是超时时长要设置多大,没有底。
  5. ​        方案2,需要前端支持Web2.0,对浏览器有所限制,代码也变得复杂。
  6. ​        方案3,使用异步请求处理。所谓异步请求处理,就是将请求异步化,前端发起请求后,后端很快就响应,返回一个任务ID,前端再用这个任务ID去轮询,获取处理进程和结果信息。需要两个接口:任务请求接口和任务信息轮询接口。
  7. ​        显然,对于长时间的业务处理,通过轮询,获取处理进程信息,可以获得较好的用户体验。正如大文件下载,用户可以了解下载的进度一样,业务处理同样可以通过输出处理日志信息和进度,使得长时间业务处理过程可视化,而不至于让用户长时间面对一个在空转鼠标符号。
  8. ​        本文针对方案3,提出一种通用的处理框架。使用这个通用的异步请求处理框架,可以适应各种不同的需要长时间异步处理的业务需求。
复制代码
2、异步请求处理框架描述
  1. ​        本异步请求处理框架,主要包括任务信息、任务执行类(Runnable)、任务管理器。
复制代码
2.1、任务信息对象类TaskInfo
  1. ​        任务信息对象,用于存储任务信息,其生命周期为:创建任务==>加入任务队列==>加入线程池工作线程队列==>任务执行==>任务执行完成==>任务对象缓存超期销毁。
  2. ​        1)任务识别信息:
  3. ​        1.1)任务ID:任务ID用于识别任务信息,一个任务ID对应一个任务,是全局唯一的,不区分任务类型。
  4. ​        1.2)任务名称:即任务类型的名称,对应于业务处理类型名称,如查询商品单价、查询商品库存等,这样可方便可视化识别任务。一个任务名称可以有多个任务实例。
  5. ​        1.3)会话ID(sessionId):用于身份识别,这样只有请求者才能根据返回的任务ID来查询任务信息,其它用户无权访问。想象一下同步请求,谁发起请求,响应给谁。
  6. ​        2)任务调用信息:使得任务管理器可以使用反射方法,调用任务处理方法。
  7. ​        2.1)任务处理对象:这是业务处理对象,为Object类型,一般为Service实现类对象。
  8. ​        2.2)任务处理方法:这是一个Method对象类型,为异步处理的业务处理方法对象。这个方法必须是public的方法。
  9. ​        2.3)任务方法参数:这是一个Map<String,Object>类型字典对象,可适应任意参数结构。
  10. ​        3)任务处理过程和结果相关信息:可以提供任务处理过程和结果可视化的信息。
  11. ​        3.1)任务状态:表示任务目前的处理状态,0-未处理,1-处理中,2-处理结束。
  12. ​        3.2)处理日志:这是一个List<String>类型的字符串列表,用于存放处理日志。处理日志格式化:"time level taskId taskName --- logInfo",便于前端展示。
  13. ​        3.3)处理进度百分比:这是double类型数据,0.0-100.0,业务单元可视需要使用。
  14. ​        3.4)处理结果:这是一个Object类型对象,真实数据类型由业务单元约定。在未处理结束前,该值为null,处理结束后,如有返回值,此时赋值。
  15. ​        3.5)返回码:业务处理,可能遇到异常,如需设置返回码,此处赋值。
  16. ​        3.6)返回消息:与返回码相联系的提示信息。
  17. ​        3.7)开始处理时间戳:在任务启动(开始执行时)设置,用于计算业务处理的耗时时长。
  18. ​        4)任务缓存到期时间:任务处理完成后,任务信息会缓存一段时间(如60秒),等待前端获取,超期后,任务对象被销毁,意味着再也无法获取任务信息了。后端系统不可能累积存放超期的任务信息,否则可能导致OOM(Out Of Memory)异常。
复制代码
2.2、任务执行类TaskRunnable
  1. ​        任务执行类,实现Runnable接口,是为线程池的工作线程提供处理方法。
  2. ​        任务执行类,使用任务信息对象作为参数,并调用任务信息的任务调用信息,使用反射方法,来执行任务处理。
复制代码
2.3、任务管理器类TaskManService
  1. ​        任务管理器,全局对象,使用@Service注解,加入Spring容器。这样,任何需要异步处理的业务都可以访问任务管理器。
  2. ​        任务管理器,包含下列属性:
  3. ​        1)任务队列:LinkedBlockingQueue<TaskInfo>类型,考虑到OOM问题,容量使用有限值,如1万,即最大缓存1万个任务,相当于二级缓存。
  4. ​        2)任务信息字典:Map<Integer,TaskInfo>类型,key为taskId,目的是为了方便根据taskId快速查询任务信息。
  5. ​        3)线程池:ThreadPoolExecutor类型,工作线程队列长度为线程池的最大线程数,相当于一级缓存。可以设置核心线程数,最大线程数,工作线程队列长度等参数,如设置核心线程数为5,最大线程数为100,工作线程队列长度为100。线程工厂ThreadFactory使用Executors.defaultThreadFactory()。
  6. ​        4)任务ID计数器:AtomicInteger类型,用于分配唯一的任务ID。
  7. ​        5)监视线程:用于任务调度,以及检查缓存到期时间超期的已结束任务信息。
  8. ​        6)监视线程的执行类对象:Runnable对象,提供监视线程的执行方法。
  9. ​        7)上次检查时间戳:用于检查缓存到期时间,每秒1次检查。
  10. ​        任务管理器,包含下列接口方法:
  11. ​        1)添加任务:addTask,获取sessionId,检查任务处理对象、方法及参数是否为null,然后分配任务ID,创建任务对象,加入任务队列。如果参数为void,也需要构造一个空的Map<String,Object>字典对象。如果任务队列未满,就将任务加入任务队列中,并返回包含任务ID的字典,否则抛出“任务队列已满”的异常信息。
  12. ​        2)获取任务信息:getTaskInfo,参数为request和任务ID,如果sessionId与请求时相同,且任务对象能在任务信息字典中找到,就返回任务信息对象,否则抛出相关异常。
  13. ​        任务管理器的核心方法:
  14. ​        1)初始化:使用@PostConstruct注解,启动监视线程,并预启动线程池的一个核心线程。
  15. ​        2)监视线程的执行类run方法:实现每秒一次的超期已处理结束的任务信息的检查,以及任务调度。任务调度方法:
  16. ​        2.1)如果任务队列非空,且线程池未满,则取出一个任务信息对象,并创建一个任务执行类对象,加入到线程池的工作线程队列(execute方法加入)。
  17. ​        2.2)如果任务队列非空,且线程池已满,则等待100毫秒。
  18. ​        2.3)如果任务队列为空,则等待100毫秒。
复制代码
3、异步请求处理框架代码

3.1、任务信息对象类TaskInfo
  1. ​        任务信息对象类TaskInfo,代码如下:
复制代码
  1. package com.abc.example.asyncproc;
  2. import java.lang.reflect.Method;
  3. import java.time.LocalDateTime;
  4. import java.time.format.DateTimeFormatter;
  5. import java.util.ArrayList;
  6. import java.util.List;
  7. import java.util.Map;
  8. import lombok.Data;
  9. /**
  10. * @className        : TaskInfo
  11. * @description        : 任务信息
  12. * @summary        :
  13. * @history        :
  14. * ------------------------------------------------------------------------------
  15. * date                        version                modifier                remarks                  
  16. * ------------------------------------------------------------------------------
  17. * 2022/08/17        1.0.0                sheng.zheng                初版
  18. *
  19. */
  20. @Data
  21. public class TaskInfo {
  22.        
  23.         // ////////////////////////////////////////////////
  24.         // 任务识别信息
  25.        
  26.         // 任务ID
  27.         private Integer taskId = 0;
  28.        
  29.         // sessionId,用于识别请求者
  30.         private String sessionId = "";
  31.         // 任务名称,即业务处理的名称,如查询商品最低价,导入学员名册
  32.         private String taskName = "";
  33.        
  34.         // ////////////////////////////////////////////////
  35.         // 任务执行相关的
  36.        
  37.         // 请求参数,使用字典进行封装,以便适应任意数据结构
  38.         private Map<String, Object> params;
  39.        
  40.         // 处理对象,一般是service对象
  41.         private Object procObject;
  42.        
  43.         // 处理方法
  44.         private Method method;
  45.        
  46.         // ////////////////////////////////////////////////
  47.         // 任务处理产生的数据,中间数据,结果
  48.        
  49.         // 处理状态,0-未处理,1-处理中,2-处理结束
  50.         private int procStatus = 0;
  51.        
  52.         // 处理结果,数据类型由业务单元约定
  53.         private Object result;
  54.         // 处理日志,包括中间结果,格式化显示:Time level taskId taskName logInfo
  55.         private List<String> logList = new ArrayList<String>();       
  56.        
  57.         // 处理进度百分比
  58.         private double progress = 0;       
  59.        
  60.         // 到期时间,UTC,任务完成后才设置,超时后销毁
  61.         private long expiredTime = 0;       
  62.         // 返回码,保留,0表示操作成功
  63.         private int resultCode = 0;
  64.        
  65.         // 响应消息,保留
  66.         private String message = "";
  67.        
  68.         // 开始处理时间,便于统计任务处理时长
  69.         private long startTime = 0;       
  70.        
  71.         // ////////////////////////////////////////////////
  72.         // 日志相关的方法
  73.         private DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
  74.                
  75.         // 添加处理日志
  76.         public void addLogInfo(String level,String logInfo) {
  77.                 // 格式化显示:Time level taskId taskName logInfo
  78.                 LocalDateTime current = LocalDateTime.now();
  79.                 String strCurrent = current.format(df);
  80.                 String log = String.format("%s %s %d %s --- %s",
  81.                                 strCurrent,level,taskId,taskName,logInfo);
  82.                 logList.add(log);
  83.         }
  84.        
  85.        
  86.         // ////////////////////////////////////////////////
  87.         // 不同状态的参数设置接口
  88.        
  89.         // 设置任务初始化,未开始
  90.         public void init(Integer taskId,String taskName,String sessionId,
  91.                         Object procObject,Method method,Map<String, Object> params) {
  92.                 this.procStatus = 0;
  93.                 this.taskId = taskId;
  94.                 this.taskName = taskName;
  95.                 this.sessionId = sessionId;
  96.                 this.procObject = procObject;
  97.                 this.method = method;
  98.                 this.params = params;
  99.         }
  100.        
  101.         // 启动任务
  102.         public void start() {
  103.                 this.procStatus = 1;
  104.                 addLogInfo(TaskConstants.LEVEL_INFO,"开始处理任务...");
  105.                 // 记录任务开始处理的时间
  106.                 startTime = System.currentTimeMillis();
  107.         }
  108.        
  109.         // 结束任务
  110.         public void finish(Object result) {
  111.                 this.result = result;
  112.                 this.procStatus = 2;
  113.                 // 设置结果缓存的到期时间
  114.                 long current = System.currentTimeMillis();
  115.                 this.expiredTime = current + TaskConstants.PROC_EXPIRE_TIME;
  116.                 long duration = 0;
  117.                 double second = 0.0;
  118.                 duration = current - startTime;
  119.                 second = duration / 1000.0;
  120.                 addLogInfo(TaskConstants.LEVEL_INFO,"任务处理结束,耗时(s):"+second);
  121.         }
  122.        
  123.         // 处理异常
  124.         public void error(int resultCode,String message) {
  125.                 this.resultCode = resultCode;
  126.                 this.message = message;
  127.                 this.procStatus = 2;
  128.         }
  129. }
复制代码
  1. ​        说明:任务信息对象类TaskInfo提供了几个常用的处理方法,如addLogInfo、init、start、finish、error,便于简化属性值设置。
复制代码
3.2、任务执行类TaskRunnable
  1. ​        任务执行类TaskRunnable,代码如下:
复制代码
  1. package com.abc.example.asyncproc;
  2. import java.lang.reflect.InvocationTargetException;
  3. import java.lang.reflect.Method;
  4. import com.abc.example.common.utils.LogUtil;
  5. import com.abc.example.exception.BaseException;
  6. import com.abc.example.exception.ExceptionCodes;
  7. /**
  8. * @className        : TaskRunnable
  9. * @description        : 可被线程执行的任务执行类
  10. * @summary        :
  11. * @history        :
  12. * ------------------------------------------------------------------------------
  13. * date                        version                modifier                remarks                  
  14. * ------------------------------------------------------------------------------
  15. * 2022/08/17        1.0.0                sheng.zheng                初版
  16. *
  17. */
  18. public class TaskRunnable implements Runnable {
  19.         // 任务信息
  20.         private TaskInfo taskInfo;
  21.        
  22.         public TaskRunnable(TaskInfo taskInfo) {
  23.                 this.taskInfo = taskInfo;
  24.         }
  25.        
  26.         // 获取任务ID
  27.         public Integer getTaskId() {
  28.                 if (taskInfo != null) {
  29.                         return taskInfo.getTaskId();
  30.                 }
  31.                 return 0;
  32.         }
  33.                
  34.         @Override
  35.         public void run() {
  36.                 Object procObject = taskInfo.getProcObject();
  37.                 Method method = taskInfo.getMethod();
  38.                                
  39.                 try {
  40.                         // 使用反射方法,调用方法来处理任务
  41.                         method.invoke(procObject, taskInfo);
  42.                 }catch(BaseException e) {
  43.                         // 优先处理业务处理异常
  44.                         taskInfo.error(e.getCode(),e.getMessage());
  45.                         LogUtil.error(e);
  46.                 }catch(InvocationTargetException e) {
  47.                         taskInfo.error(ExceptionCodes.ERROR.getCode(),e.getMessage());
  48.                         LogUtil.error(e);
  49.                 }catch(IllegalAccessException e) {
  50.                         taskInfo.error(ExceptionCodes.ERROR.getCode(),e.getMessage());
  51.                         LogUtil.error(e);                       
  52.                 }catch(IllegalArgumentException e) {
  53.                         taskInfo.error(ExceptionCodes.ERROR.getCode(),e.getMessage());
  54.                         LogUtil.error(e);                       
  55.                 }catch(Exception e) {
  56.                         // 最后处理未知异常
  57.                         taskInfo.error(ExceptionCodes.ERROR.getCode(),e.getMessage());
  58.                         LogUtil.error(e);                       
  59.                 }
  60.         }
  61. }
复制代码
3.3、任务常量类TaskConstants
  1. ​        任务常量类TaskConstants,提供异步请求处理框架模块的相关常量设置,代码如下:
复制代码
  1. package com.abc.example.asyncproc;
  2. /**
  3. * @className        : TaskConstants
  4. * @description        : 任务处理相关常量
  5. * @summary        :
  6. * @history        :
  7. * ------------------------------------------------------------------------------
  8. * date                        version                modifier                remarks                  
  9. * ------------------------------------------------------------------------------
  10. * 2022/08/18        1.0.0                sheng.zheng                初版
  11. *
  12. */
  13. public class TaskConstants {
  14.     // 任务缓存过期时间,单元毫秒,即任务处理完成后,设置此时长,超期销毁
  15.     public static final int PROC_EXPIRE_TIME = 60000;  
  16.    
  17.     // 线程池核心线程数
  18.     public static final int CORE_POOL_SIZE = 5;
  19.     // 线程池最大线程数
  20.     public static final int MAX_POOL_SIZE  = 100;
  21.    
  22.     // 线程池KeepAlive参数,单位秒
  23.     public static final long KEEP_ALIVE_SECONDS = 10;   
  24.    
  25.     // 任务队列最大数目
  26.     public static final int MAX_TASK_NUMS  = 10000;   
  27.    
  28.     // 日志信息告警等级
  29.     public static final String LEVEL_INFO = "INFO";   
  30.     public static final String LEVEL_ERROR = "ERROR";     
  31. }
复制代码
3.4、任务管理器类TaskManService
  1. ​        任务管理器类TaskManService,代码如下:
复制代码
  1. package com.abc.example.asyncproc;
  2. import java.lang.reflect.Method;
  3. import java.util.HashMap;
  4. import java.util.Iterator;
  5. import java.util.Map;
  6. import java.util.concurrent.BlockingQueue;
  7. import java.util.concurrent.Executors;
  8. import java.util.concurrent.LinkedBlockingQueue;
  9. import java.util.concurrent.ThreadPoolExecutor;
  10. import java.util.concurrent.TimeUnit;
  11. import java.util.concurrent.atomic.AtomicInteger;
  12. import javax.annotation.PostConstruct;
  13. import javax.servlet.http.HttpServletRequest;
  14. import org.springframework.stereotype.Service;
  15. import com.abc.example.common.utils.LogUtil;
  16. import com.abc.example.exception.BaseException;
  17. import com.abc.example.exception.ExceptionCodes;
  18. /**
  19. * @className        : TaskManService
  20. * @description        : 任务管理器
  21. * @summary        :
  22. * @history        :
  23. * ------------------------------------------------------------------------------
  24. * date                        version                modifier                remarks                  
  25. * ------------------------------------------------------------------------------
  26. * 2022/08/18        1.0.0                sheng.zheng                初版
  27. *
  28. */
  29. @Service
  30. public class TaskManService {
  31.         // 任务队列,考虑OOM(Out Of Memory)问题,限定任务队列长度,相当于二级缓存
  32.         private BlockingQueue<TaskInfo> taskQueue =
  33.                         new LinkedBlockingQueue<TaskInfo>(TaskConstants.MAX_TASK_NUMS);
  34.        
  35.         // 任务信息字典,key为taskId,目的是为了方便根据taskId查询任务信息
  36.         private Map<Integer,TaskInfo> taskMap = new HashMap<Integer,TaskInfo>();
  37.        
  38.         // 线程池,工作线程队列长度为线程池的最大线程数,相当于一级缓存
  39.         private ThreadPoolExecutor executor = new ThreadPoolExecutor(
  40.                         TaskConstants.CORE_POOL_SIZE,
  41.                         TaskConstants.MAX_POOL_SIZE,
  42.                         TaskConstants.KEEP_ALIVE_SECONDS,
  43.                         TimeUnit.SECONDS,
  44.                         new LinkedBlockingQueue<>(TaskConstants.MAX_POOL_SIZE),
  45.                         Executors.defaultThreadFactory());
  46.                
  47.         // 任务ID计数器,累加
  48.         private AtomicInteger taskIdCounter = new AtomicInteger();
  49.        
  50.         // 用于缓存上次检查时间
  51.         private long lastTime = 0;       
  52.        
  53.         // 监视线程,用于任务调度,以及检查已结束任务的缓存到期时间
  54.         private Thread monitor;
  55.         @PostConstruct
  56.         public void init(){
  57.                 // 启动线程实例
  58.                 monitor = new Thread(checkRunnable);
  59.                 monitor.start();
  60.                
  61.                 // 启动一个核心线程
  62.                 executor.prestartCoreThread();
  63.         }                       
  64.        
  65.         // 检查已结束任务的缓存到期时间,超期的销毁
  66.         private Runnable checkRunnable = new Runnable() {
  67.                 @Override
  68.         public void run() {
  69.                         while (true) {
  70.                                 long current = System.currentTimeMillis();
  71.                                 if(current - lastTime >= 1000) {
  72.                                         // 离上次检查时间超过1秒
  73.                                         checkAndremove();
  74.                                         // 更新lastTime
  75.                                         lastTime = current;
  76.                                 }
  77.                                 synchronized(this) {
  78.                                         try {
  79.                                                 // 检查任务队列
  80.                                                 if(taskQueue.isEmpty()) {
  81.                                                         // 如果任务队列为空,则等待100ms
  82.                                                         Thread.sleep(100);
  83.                                                 }else {
  84.                                                         // 如果任务队列不为空
  85.                                                         // 检查线程池队列
  86.                                                         if (executor.getQueue().size() < TaskConstants.MAX_POOL_SIZE) {
  87.                                                                 // 如果线程池队列未满
  88.                                                                 // 从任务队列中获取一个任务
  89.                                                                 TaskInfo taskInfo = taskQueue.take();
  90.                                                                 // 创建Runnable对象
  91.                                                                 TaskRunnable tr = new TaskRunnable(taskInfo);
  92.                                                                 // 调用线程池执行任务
  93.                                                                 executor.execute(tr);
  94.                                                         }else {
  95.                                                                 // 如果线程池队列已满,则等待100ms
  96.                                                                 Thread.sleep(100);
  97.                                                         }
  98.                                                 }                                                                                       
  99.                                         }catch (InterruptedException e) {
  100.                                                 LogUtil.error(e);
  101.                                         }
  102.                                 }
  103.                         }
  104.                 }
  105.         };
  106.        
  107.         /**
  108.          *
  109.          * @methodName        : checkAndremove
  110.          * @description        : 检查并移除过期对象
  111.          * @history        :
  112.          * ------------------------------------------------------------------------------
  113.          * date                        version                modifier                remarks                  
  114.          * ------------------------------------------------------------------------------
  115.          * 2022/08/15        1.0.0                sheng.zheng                初版
  116.          *
  117.          */
  118.         private void checkAndremove() {
  119.                 synchronized(taskMap) {
  120.                         if (taskMap.size() == 0) {
  121.                                 // 如果无对象
  122.                                 return;
  123.                         }
  124.                         long current = System.currentTimeMillis();
  125.                         Iterator<Map.Entry<Integer,TaskInfo>> iter = taskMap.entrySet().iterator();
  126.                         while(iter.hasNext()) {
  127.                                 Map.Entry<Integer,TaskInfo> entry = iter.next();
  128.                                 TaskInfo taskInfo = entry.getValue();
  129.                                 long expiredTime = taskInfo.getExpiredTime();
  130.                                 if ((expiredTime != 0) && ((current - expiredTime) > TaskConstants.PROC_EXPIRE_TIME)) {
  131.                                         // 如果过期,移除
  132.                                         iter.remove();
  133.                                 }
  134.                         }                                       
  135.                 }
  136.         }       
  137.        
  138.         /**
  139.          *
  140.          * @methodName                : addTask
  141.          * @description         : 添加任务
  142.          * @param request        : request对象
  143.          * @param taskName        : 任务名称
  144.          * @param procObject    : 处理对象
  145.          * @param method        : 处理方法
  146.          * @param params        : 方法参数,透明传递到处理方法中
  147.          * @return                : 处理ID,唯一标识该请求的处理
  148.          * @history                :
  149.          * ------------------------------------------------------------------------------
  150.          * date                        version                modifier                remarks                  
  151.          * ------------------------------------------------------------------------------
  152.          * 2022/08/19        1.0.0                sheng.zheng                初版
  153.          *
  154.          */
  155.         public Integer addTask(HttpServletRequest request,
  156.                         String taskName,Object procObject,Method method,
  157.                         Map<String, Object> params) {
  158.                 // 获取sessionId
  159.                 String sessionId = null;
  160.                 if (request.getSession() != null) {
  161.                         sessionId = request.getSession().getId();
  162.                 }else {
  163.                         // 无效的session
  164.                         throw new BaseException(ExceptionCodes.SESSION_IS_NULL);                       
  165.                 }
  166.                
  167.                 // 空指针保护
  168.                 if (procObject == null) {
  169.                         throw new BaseException(ExceptionCodes.ARGUMENTS_ERROR,"procObject对象为null");                               
  170.                 }
  171.                 if (method == null) {
  172.                         throw new BaseException(ExceptionCodes.ARGUMENTS_ERROR,"method对象为null");                               
  173.                 }
  174.                 if (params == null) {
  175.                         throw new BaseException(ExceptionCodes.ARGUMENTS_ERROR,"params对象为null");                               
  176.                 }               
  177.                 // 获取可用的任务ID
  178.                 Integer taskId = taskIdCounter.incrementAndGet();
  179.                
  180.                 // 生成任务处理信息对象               
  181.                 TaskInfo item = new TaskInfo();
  182.                 // 初始化任务信息
  183.                 item.init(taskId,taskName,sessionId,procObject,method,params);
  184.                
  185.                 // 加入处理队列
  186.                 try {
  187.                         synchronized(taskQueue) {
  188.                                 taskQueue.add(item);                               
  189.                         }
  190.                 }catch(IllegalStateException e) {
  191.                         // 队列已满
  192.                         throw new BaseException(ExceptionCodes.ADD_OBJECT_FAILED,"任务队列已满");
  193.                 }
  194.                
  195.                 // 加入字典
  196.                 synchronized(taskMap) {
  197.                         taskMap.put(taskId, item);
  198.                 }
  199.                                
  200.                 return taskId;
  201.         }
  202.        
  203.         /**
  204.          *
  205.          * @methodName                : getTaskInfo
  206.          * @description                : 获取任务信息
  207.          * @param request        : request对象
  208.          * @param taskId        : 任务ID       
  209.          * @return                : TaskInfo对象
  210.          * @history                :
  211.          * ------------------------------------------------------------------------------
  212.          * date                        version                modifier                remarks                  
  213.          * ------------------------------------------------------------------------------
  214.          * 2022/08/19        1.0.0                sheng.zheng                初版
  215.          *
  216.          */
  217.         public TaskInfo getTaskInfo(HttpServletRequest request,Integer taskId) {
  218.                 TaskInfo item = null;
  219.                 synchronized(taskMap) {
  220.                         if (taskMap.containsKey(taskId)) {
  221.                                 item = taskMap.get(taskId);
  222.                                 String sessionId = request.getSession().getId();
  223.                                 if (!sessionId.equals(item.getSessionId())) {
  224.                                         throw new BaseException(ExceptionCodes.TASKID_NOT_RIGHTS);                               
  225.                                 }
  226.                         }else {
  227.                                 throw new BaseException(ExceptionCodes.TASKID_NOT_EXIST);
  228.                         }                       
  229.                 }
  230.                                
  231.                 return item;
  232.         }       
  233.        
  234. }
复制代码
3.5、异常处理类BaseException
  1. ​        异常处理类BaseException,代码如下:
复制代码
  1. package com.abc.example.exception;
  2. import lombok.Data;
  3. /**
  4. * @className        : BaseException
  5. * @description        : 异常信息基类
  6. * @summary        : 可以处理系统异常和自定义异常
  7. * @history        :
  8. * ------------------------------------------------------------------------------
  9. * date                        version                modifier                remarks                  
  10. * ------------------------------------------------------------------------------
  11. * 2021/01/01        1.0.0                sheng.zheng                初版
  12. *
  13. */
  14. @Data
  15. public class BaseException extends RuntimeException{
  16.         private static final long serialVersionUID = 4359709211352401087L;
  17.        
  18.         // 异常码
  19.     private int  code ;
  20.    
  21.     // 异常信息ID
  22.     private String messageId;
  23.    
  24.     // 异常信息
  25.     private String message;
  26.    
  27.           // =============== 以下为各种构造函数,重载 ===================================
  28.    
  29.     public BaseException(String message) {
  30.         this.message = message;
  31.     }
  32.     public BaseException(String message, Throwable e) {
  33.         this.message = message;
  34.     }
  35.     public BaseException(int code, String message) {
  36.         this.message = message;
  37.         this.code = code;
  38.     }
  39.     public BaseException(ExceptionCodes e) {
  40.         this.code = e.getCode();
  41.         this.messageId = e.getMessageId();
  42.         this.message = e.getMessage();
  43.     }
  44.    
  45.     public BaseException(ExceptionCodes e,String message) {
  46.         this.code = e.getCode();
  47.         this.messageId = e.getMessageId();
  48.         this.message = e.getMessage() + ":" + message;
  49.     }   
  50.     public BaseException(int code, String message, Throwable e) {
  51.         this.message = message;
  52.         this.code = code;   
  53.         
  54.     }
  55. }
复制代码
3.6、异常信息枚举类ExceptionCodes
  1. ​        异常信息枚举类ExceptionCodes,代码如下:
复制代码
  1. package com.abc.example.exception;
  2. /**
  3. * @className        : ExceptionCodes
  4. * @description        : 异常信息枚举类
  5. * @summary        :
  6. * @history        :
  7. * ------------------------------------------------------------------------------
  8. * date                        version                modifier                remarks                  
  9. * ------------------------------------------------------------------------------
  10. * 2021/01/01        1.0.0                sheng.zheng                初版
  11. *
  12. */
  13. public enum ExceptionCodes {
  14.     // 0-99,reserved for common exception
  15.     SUCCESS(0, "message.SUCCESS", "操作成功"),
  16.     FAILED(1, "message.FAILED", "操作失败"),
  17.     ERROR(99, "message.ERROR", "操作异常"),
  18.     ARGUMENTS_ERROR(2, "message.ARGUMENTS_ERROR","参数错误"),
  19.     TASKID_NOT_EXIST(16, "message.TASKID_NOT_EXIST","任务ID不存在,可能已过期销毁"),
  20.     TASKID_NOT_RIGHTS(17, "message.TASKID_NOT_RIGHTS","无权访问此任务ID"),
  21.     SESSION_IS_NULL(18, "message.SESSION_IS_NULL","session为空,请重新登录"),
  22.     ARGUMENTS_IS_EMPTY(22, "message.ARGUMENTS_IS_EMPTY","参数值不能为空"),
  23.     ADD_OBJECT_FAILED(30, "message.ADD_OBJECT_FAILED", "新增对象失败"),
  24.     ;        // 定义结束
  25.        
  26.         // 返回码
  27.     private int code;
  28.     public int getCode() {
  29.             return this.code;
  30.     }
  31.    
  32.     // 返回消息ID
  33.     private String messageId;
  34.     public String getMessageId() {
  35.             return this.messageId;
  36.     }
  37.     // 返回消息
  38.     private String message;
  39.     public String getMessage() {
  40.             return this.message;
  41.     }
  42.    
  43.     ExceptionCodes(int code, String messageId, String message) {
  44.         this.code = code;
  45.         this.messageId = messageId;
  46.         this.message = message;
  47.     }
  48. }
复制代码
3.7、通用异常处理类UniveralExceptionHandler
  1. ​        通用异常处理类UniveralExceptionHandler,这是一个异常信息捕获的拦截器,代码如下:
复制代码
  1. package com.abc.example.exception;
  2. import java.util.HashMap;
  3. import java.util.Map;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import org.springframework.web.bind.annotation.ControllerAdvice;
  7. import org.springframework.web.bind.annotation.ExceptionHandler;
  8. import org.springframework.web.bind.annotation.ResponseBody;
  9. /**
  10. * @className        : UniveralExceptionHandler
  11. * @description        : 通用异常处理类
  12. * @summary        :
  13. * @history        :
  14. * ------------------------------------------------------------------------------
  15. * date                        version                modifier                remarks                  
  16. * ------------------------------------------------------------------------------
  17. * 2021/01/01        1.0.0                sheng.zheng                初版
  18. *
  19. */
  20. @ControllerAdvice
  21. public class UniveralExceptionHandler {
  22.         Logger logger = LoggerFactory.getLogger(getClass());
  23.        
  24.         /**
  25.          *
  26.          * @methodName        : handleException
  27.          * @description        : 拦截非业务异常
  28.          * @param e        : Exception类型的异常
  29.          * @return        : JSON格式的异常信息
  30.          * @history     :
  31.          * ------------------------------------------------------------------------------
  32.          * date                        version                modifier                remarks                  
  33.          * ------------------------------------------------------------------------------
  34.          * 2021/01/01        1.0.0                sheng.zheng                初版
  35.          *
  36.          */
  37.     @ResponseBody
  38.     @ExceptionHandler(Exception.class)
  39.     public Map<String,Object> handleException(Exception e) {
  40.             //将异常信息写入日志
  41.         logger.error(e.getMessage(), e);
  42.         //输出通用错误代码和信息
  43.         Map<String,Object> map = new HashMap<>();
  44.         map.put("code", ExceptionCodes.ERROR.getCode());
  45.         map.put("message", ExceptionCodes.ERROR.getMessage());
  46.         return map;
  47.     }
  48.     /**
  49.      *
  50.      * @methodName        : handleBaseException
  51.      * @description        : 拦截业务异常
  52.      * @param e                : BaseException类型的异常
  53.      * @return                : JSON格式的异常信息
  54.      * @history                :
  55.      * ------------------------------------------------------------------------------
  56.      * date                        version                modifier                remarks                  
  57.      * ------------------------------------------------------------------------------
  58.      * 2021/01/01        1.0.0                sheng.zheng                初版
  59.      *
  60.      */
  61.     @ResponseBody
  62.     @ExceptionHandler(BaseException.class)
  63.     public Map<String,Object> handleBaseException(BaseException e) {
  64.             //将异常信息写入日志
  65.         logger.error("业务异常:code:{},messageId:{},message:{}", e.getCode(), e.getMessageId(), e.getMessage());
  66.         //输出错误代码和信息
  67.         Map<String,Object> map = new HashMap<>();
  68.         map.put("code", e.getCode());
  69.         map.put("message" ,e.getMessage());
  70.         return map;
  71.     }
  72.    
  73. }
复制代码
3.8、日志工具类LogUtil
  1. ​        日志工具类LogUtil,相关方法代码如下:
复制代码
  1. package com.abc.example.common.utils;
  2. import lombok.extern.slf4j.Slf4j;
  3. /**
  4. * @className        : LogUtil
  5. * @description        : 日志工具类
  6. * @summary        :
  7. * @history        :
  8. * ------------------------------------------------------------------------------
  9. * date                        version                modifier                remarks                  
  10. * ------------------------------------------------------------------------------
  11. * 2021/01/01        1.0.0                sheng.zheng                初版
  12. *
  13. */
  14. @Slf4j
  15. public class LogUtil {
  16.         /**
  17.          *
  18.          * @methodName        : error
  19.          * @description        : 输出异常信息
  20.          * @param e        : Exception对象
  21.          * @history        :
  22.          * ------------------------------------------------------------------------------
  23.          * date                        version                modifier                remarks                  
  24.          * ------------------------------------------------------------------------------
  25.          * 2021/01/01        1.0.0                sheng.zheng                初版
  26.          *
  27.          */
  28.         public static void error(Exception e) {
  29.                 e.printStackTrace();
  30.                 String ex = getString(e);
  31.                 log.error(ex);
  32.         }       
  33.         /**
  34.          *
  35.          * @methodName        : getString
  36.          * @description        : 获取Exception的getStackTrace信息
  37.          * @param ex        : Exception对象
  38.          * @return        : 错误调用栈信息
  39.          * @history        :
  40.          * ------------------------------------------------------------------------------
  41.          * date                        version                modifier                remarks                  
  42.          * ------------------------------------------------------------------------------
  43.          * 2021/01/01        1.0.0                sheng.zheng                初版
  44.          *
  45.          */
  46.     public static String getString(Exception ex) {
  47.         StringBuilder stack = new StringBuilder();
  48.         StackTraceElement[] sts = ex.getStackTrace();
  49.         for (StackTraceElement st : sts) {
  50.             stack.append(st.toString()).append("\r\n");
  51.         }
  52.         return stack.toString();
  53.     }
  54. }
复制代码
4、异步请求处理测试例子
  1. ​        下面使用一个测试例子,来说明如何使用此框架。
复制代码
4.1、异步任务的业务处理类
  1. ​        假设有一个测试任务服务类TestTaskService,简单起见,不用接口类了,直接就是可实例化的类。这个类有一个需要异步处理的方法,方法名为testTask。
  2. ​        testTask方法只接受TaskInfo类型的参数,但实际参数params为Map字典(相当于JSON对象),包含repeat和delay,这两个参数是testTask方法所需要的。处理结果result此处为字符串类型,这个类型在实际处理时可以是任意类型,只需要与前端有约定即可。
  3. ​        为方便控制器调用,TestTaskService提供两个接口方法:addAsyncTask和getTaskInfo。
  4. ​        addAsyncTask方法,有2个参数,request和请求参数params,请求参数params是控制器@RequestBody的请求参数,或者重新封装的适应testTask处理的参数。对于业务处理类TestTaskService来说,testTask方法需要什么形式和类型的参数,属于内部约定,只要两者匹配即可。本例子比较简单,直接透传HTTP请求参数,作为任务处理的方法参数。addAsyncTask方法,执行输入参数校验(不要等执行任务时,再去校验参数),然后调用任务管理器addTask方法,加入一个任务,并获取任务ID,返回前端。
  5. ​        getTaskInfo方法,有2个参数,request和请求参数params,请求参数params包含任务ID参数,调用任务管理器的getTaskInfo方法。获取TaskInfo对象,然后屏蔽一些不需要展示的信息,返回前端。getTaskInfo方法用于前端轮询,查询任务执行过程和结果。
  6. ​        测试任务服务类TestTaskService,代码如下:
复制代码
[code]package com.abc.example.asyncproc;import java.lang.reflect.Method;import java.util.HashMap;import java.util.Map;import javax.servlet.http.HttpServletRequest;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import com.abc.example.common.utils.LogUtil;import com.abc.example.common.utils.Utility;import com.abc.example.exception.BaseException;import com.abc.example.exception.ExceptionCodes;/** * @className        : TestTaskService * @description        : 测试任务服务类 * @summary        : * @history        : * ------------------------------------------------------------------------------ * date                        version                modifier                remarks                    * ------------------------------------------------------------------------------ * 2022/08/19        1.0.0                sheng.zheng                初版 * */@Servicepublic class TestTaskService {        // 任务管理器        @Autowired        private TaskManService taskManService;                /**         *          * @methodName                : addAsyncTask         * @description                : 新增一个异步任务         * @summary                : 新增测试任务类型的异步任务,         *         如果处理队列未满,可立即获取任务ID:         *                 根据此任务ID,可以通过调用getTaskInfo,获取任务的处理进度信息;         *                 如果任务处理完毕,任务信息缓存60秒,过期后无法再获取;         *         如果处理队列已满,返回任务队列已满的失败提示。         * @param request        : request对象         * @param params        : 请求参数,形式如下:         *         {         *                 "repeat"        : 10,        // 重复次数,默认为10,可选         *                 "delay"                : 1000,        // 延时毫秒数,默认为1000,可选         *         }         * @return                        : JSON对象,形式如下:         *         {         *                 "taskId"        : 1,        // 任务ID         *         }         * @history                :         * ------------------------------------------------------------------------------         * date                        version                modifier                remarks                            * ------------------------------------------------------------------------------         * 2022/08/19        1.0.0                sheng.zheng                初版         *         */        public Map addAsyncTask(HttpServletRequest request,                        Map params){                // 参数校验                Integer repeat = (Integer)params.get("repeat");                if (repeat == null) {                        repeat = 10;                }                Integer delay = (Integer)params.get("delay");                if (delay == null) {                        delay = 1000;                }                if (repeat
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

我爱普洱茶

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表