基于java线程池和EasyExcel实现数据异步导入
2.代码实现
2.1 controller层
- @PostMapping("import")
- public void importExcel(MultipartFile file) throws IOException {
- importService.importExcelAsync(file);
- }
复制代码 2.2 service层
- @Resource
- private SalariesListener salariesListener;
- private ExecutorService executorService = Executors.newFixedThreadPool(20);
- public void importExcelAsync(MultipartFile file) {
- // 开20个线程分别处理20个sheet
- List<Callable<Object>> tasks = new ArrayList<>();
- for (int i = 0; i < 20; i++) {
- int num = i;
- tasks.add(() -> {
- EasyExcel.read(file.getInputStream(), Salaries.class, salariesListener)
- .sheet(num).doRead();
- return null;
- });
- }
- try {
- //等待所有任务完成
- executorService.invokeAll(tasks);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
复制代码 2.3实体
- @Data
- @TableName("salaries")
- public class Salaries {
- private Integer empNo;
- private Integer salary;
- private Date fromDate;
- private Date toDate;
- }
复制代码 2.4easyExcel 监听
- import com.alibaba.excel.context.AnalysisContext;
- import com.alibaba.excel.read.listener.ReadListener;
- import com.baomidou.mybatisplus.extension.service.IService;
- import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
- import com.woniu.domain.Salaries;
- import com.woniu.mapper.SalariesMapper;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.springframework.stereotype.Component;
- import org.springframework.transaction.annotation.Transactional;
- import javax.annotation.Resource;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.atomic.AtomicInteger;
- @Component
- public class SalariesListener extends ServiceImpl<SalariesMapper, Salaries> implements ReadListener<Salaries>, IService<Salaries> {
- private static final Log logger = LogFactory.getLog(SalariesListener.class);
- //创建一个线程池,用于异步保存数据
- private ExecutorService executorService = Executors.newFixedThreadPool(20);
- //创建一个线程安全的list,用于存储读取到的数据,使用ThreadLocal保证线程安全
- private ThreadLocal<ArrayList<Salaries>> salariesList = ThreadLocal.withInitial(ArrayList::new);
- //用于统计是第几次插入
- private static AtomicInteger count = new AtomicInteger(1);
-
- //设定需要异步批量插入的条数
- private static final int batchSize = 10000;
- @Resource
- private SalariesListener salariesListener;
- @Override
- @Transactional(rollbackFor = Exception.class)
- public void invoke(Salaries data, AnalysisContext context) {
- //读取excel每一行的数据,添加到list中
- salariesList.get().add(data);
- //如果list的数据大于设定需要异步批量插入的条数,则执行异步插入
- if (salariesList.get().size() >= batchSize) {
- asyncSaveData();
- }
- }
- public void saveData() {
- if (!salariesList.get().isEmpty()) {
- saveBatch(salariesList.get(), salariesList.get().size());
- logger.info("第" + count.getAndAdd(1) + "次插入" + salariesList.get().size() + "条数据");
- salariesList.get().clear();
- }
- }
- public void asyncSaveData() {
- if (!salariesList.get().isEmpty()) {
- ArrayList<Salaries> salaries = (ArrayList<Salaries>) salariesList.get().clone();
- executorService.execute(new SaveTask(salaries, salariesListener));
- salariesList.get().clear();
- }
- }
- @Override
- @Transactional(rollbackFor = Exception.class)
- public void doAfterAllAnalysed(AnalysisContext context) {
- logger.info("一个Sheet全部处理完");
- //考虑每个sheet批量插入数据的条数少于异步插入的条数
- asyncSaveData();
- }
- //创建一个线程类,用于异步保存数据
- static class SaveTask implements Runnable {
- private List<Salaries> salariesList;
- private SalariesListener salariesListener;
- public SaveTask(List<Salaries> salariesList, SalariesListener salariesListener) {
- this.salariesList = salariesList;
- this.salariesListener = salariesListener;
- }
- @Override
- public void run() {
- salariesListener.saveBatch(salariesList);
- //打印第几次插入,每次插入的数据
- logger.info("第" + count.getAndAdd(1) + "次插入" + salariesList.size() + "条数据");
- }
- }
- }
复制代码 2.5 建表语句
- CREATE TABLE `salaries` (
- `emp_no` int(11) DEFAULT NULL COMMENT '员工号',
- `salary` int(11) DEFAULT NULL,
- `from_date` datetime DEFAULT NULL,
- `to_date` datetime DEFAULT NULL
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
复制代码- spring:
- servlet:
- multipart:
- max-request-size: 30MB
- max-file-size: 1024MB
- datasource:
- username: root
- password: root
- url: jdbc:mysql://127.0.0.1:3306/llp?rewriteBatchedStatements=true&characterEncoding=utf-8&allowPublicKeyRetrieval=true&useSSL=false&serverTimezone=Asia/Shanghai
- main:
- allow-circular-references: true
复制代码 3.测试验证
可以看到导入95万多条数据,耗时差不多在一份多钟
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |