基于java线程池和EasyExcel实现数据异步导入

打印 上一主题 下一主题

主题 985|帖子 985|积分 2955

基于java线程池和EasyExcel实现数据异步导入

2.代码实现

2.1 controller层

  1.     @PostMapping("import")
  2.     public void importExcel(MultipartFile file) throws IOException {
  3.         importService.importExcelAsync(file);
  4.     }
复制代码
2.2 service层

  1. @Resource
  2. private SalariesListener salariesListener;
  3. private ExecutorService executorService = Executors.newFixedThreadPool(20);
  4. public void importExcelAsync(MultipartFile file) {
  5.     // 开20个线程分别处理20个sheet
  6.     List<Callable<Object>> tasks = new ArrayList<>();
  7.     for (int i = 0; i < 20; i++) {
  8.         int num = i;
  9.         tasks.add(() -> {
  10.             EasyExcel.read(file.getInputStream(), Salaries.class, salariesListener)
  11.                     .sheet(num).doRead();
  12.             return null;
  13.         });
  14.     }
  15.     try {
  16.         //等待所有任务完成
  17.         executorService.invokeAll(tasks);
  18.     } catch (InterruptedException e) {
  19.         throw new RuntimeException(e);
  20.     }
  21. }
复制代码
2.3实体

  1. @Data
  2. @TableName("salaries")
  3. public class Salaries {
  4.     private Integer empNo;
  5.     private Integer salary;
  6.     private Date fromDate;
  7.     private Date toDate;
  8. }
复制代码
2.4easyExcel 监听

  1. import com.alibaba.excel.context.AnalysisContext;
  2. import com.alibaba.excel.read.listener.ReadListener;
  3. import com.baomidou.mybatisplus.extension.service.IService;
  4. import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
  5. import com.woniu.domain.Salaries;
  6. import com.woniu.mapper.SalariesMapper;
  7. import org.apache.commons.logging.Log;
  8. import org.apache.commons.logging.LogFactory;
  9. import org.springframework.stereotype.Component;
  10. import org.springframework.transaction.annotation.Transactional;
  11. import javax.annotation.Resource;
  12. import java.util.ArrayList;
  13. import java.util.List;
  14. import java.util.concurrent.ExecutorService;
  15. import java.util.concurrent.Executors;
  16. import java.util.concurrent.atomic.AtomicInteger;
  17. @Component
  18. public class SalariesListener extends ServiceImpl<SalariesMapper, Salaries> implements ReadListener<Salaries>, IService<Salaries> {
  19.     private static final Log logger = LogFactory.getLog(SalariesListener.class);
  20.     //创建一个线程池,用于异步保存数据
  21.     private ExecutorService executorService = Executors.newFixedThreadPool(20);
  22.     //创建一个线程安全的list,用于存储读取到的数据,使用ThreadLocal保证线程安全
  23.     private ThreadLocal<ArrayList<Salaries>> salariesList = ThreadLocal.withInitial(ArrayList::new);
  24.     //用于统计是第几次插入
  25.     private static AtomicInteger count = new AtomicInteger(1);
  26.    
  27.     //设定需要异步批量插入的条数
  28.     private static final int batchSize = 10000;
  29.     @Resource
  30.     private SalariesListener salariesListener;
  31.     @Override
  32.     @Transactional(rollbackFor = Exception.class)
  33.     public void invoke(Salaries data, AnalysisContext context) {
  34.         //读取excel每一行的数据,添加到list中
  35.         salariesList.get().add(data);
  36.         //如果list的数据大于设定需要异步批量插入的条数,则执行异步插入
  37.         if (salariesList.get().size() >= batchSize) {
  38.             asyncSaveData();
  39.         }
  40.     }
  41.     public void saveData() {
  42.         if (!salariesList.get().isEmpty()) {
  43.             saveBatch(salariesList.get(), salariesList.get().size());
  44.             logger.info("第" + count.getAndAdd(1) + "次插入" + salariesList.get().size() + "条数据");
  45.             salariesList.get().clear();
  46.         }
  47.     }
  48.     public void asyncSaveData() {
  49.         if (!salariesList.get().isEmpty()) {
  50.             ArrayList<Salaries> salaries = (ArrayList<Salaries>) salariesList.get().clone();
  51.             executorService.execute(new SaveTask(salaries, salariesListener));
  52.             salariesList.get().clear();
  53.         }
  54.     }
  55.     @Override
  56.     @Transactional(rollbackFor = Exception.class)
  57.     public void doAfterAllAnalysed(AnalysisContext context) {
  58.         logger.info("一个Sheet全部处理完");
  59.         //考虑每个sheet批量插入数据的条数少于异步插入的条数
  60.         asyncSaveData();
  61.     }
  62.     //创建一个线程类,用于异步保存数据
  63.     static class SaveTask implements Runnable {
  64.         private List<Salaries> salariesList;
  65.         private SalariesListener salariesListener;
  66.         public SaveTask(List<Salaries> salariesList, SalariesListener salariesListener) {
  67.             this.salariesList = salariesList;
  68.             this.salariesListener = salariesListener;
  69.         }
  70.         @Override
  71.         public void run() {
  72.             salariesListener.saveBatch(salariesList);
  73.             //打印第几次插入,每次插入的数据
  74.             logger.info("第" + count.getAndAdd(1) + "次插入" + salariesList.size() + "条数据");
  75.         }
  76.     }
  77. }
复制代码
2.5 建表语句

  1. CREATE TABLE `salaries` (
  2.   `emp_no` int(11) DEFAULT NULL COMMENT '员工号',
  3.   `salary` int(11) DEFAULT NULL,
  4.   `from_date` datetime DEFAULT NULL,
  5.   `to_date` datetime DEFAULT NULL
  6. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
复制代码
  1. spring:
  2.   servlet:
  3.     multipart:
  4.       max-request-size: 30MB
  5.       max-file-size: 1024MB
  6.   datasource:
  7.     username: root
  8.     password: root
  9.     url: jdbc:mysql://127.0.0.1:3306/llp?rewriteBatchedStatements=true&characterEncoding=utf-8&allowPublicKeyRetrieval=true&useSSL=false&serverTimezone=Asia/Shanghai
  10.   main:
  11.     allow-circular-references: true
复制代码
3.测试验证

可以看到导入95万多条数据,耗时差不多在一份多钟


  • 导入开始时间



  • 导入结束时间



  • 入库数据


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

用户国营

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表