java流式处置惩罚zip+多线程

打印 上一主题 下一主题

主题 1048|帖子 1048|积分 3144

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
概述

流式处置惩罚一个zip,zip里有多个json文件。
流式处置惩罚可以制止解压一个大的zip。再加上多线程,处置惩罚的效率杠杠的。
代码

  1. package 多线程.demo05多jsonCountDownLatch;
  2. import com.fasterxml.jackson.databind.ObjectMapper;
  3. import lombok.SneakyThrows;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.springframework.util.StopWatch;
  6. import java.io.ByteArrayOutputStream;
  7. import java.io.FileNotFoundException;
  8. import java.io.IOException;
  9. import java.nio.file.Files;
  10. import java.nio.file.Path;
  11. import java.nio.file.Paths;
  12. import java.util.ArrayList;
  13. import java.util.List;
  14. import java.util.concurrent.ExecutorService;
  15. import java.util.concurrent.Executors;
  16. import java.util.concurrent.Future;
  17. import java.util.concurrent.TimeUnit;
  18. import java.util.zip.ZipEntry;
  19. import java.util.zip.ZipInputStream;
  20. @Slf4j
  21. public class ZipProcessor {
  22.     private Path path;
  23.     private static int numThreads = Runtime.getRuntime().availableProcessors();
  24.     private static ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
  25.     private static ObjectMapper objectMapper = new ObjectMapper();
  26.     @SneakyThrows
  27.     public ZipProcessor(String filePath){
  28.         path = Paths.get(filePath);
  29.         if (!Files.exists(path)) {
  30.             throw new FileNotFoundException("The specified ZIP file does not exist: " + filePath);
  31.         }
  32.     }
  33.     public void streamProcess(){
  34.         StopWatch stopWatch = new StopWatch();
  35.         stopWatch.start();
  36.         // 使用 try-with-resources 保证资源关闭
  37.         try (ZipInputStream zis = new ZipInputStream(Files.newInputStream(path))) {
  38.             ZipEntry entry;
  39.             while ((entry = zis.getNextEntry()) != null) {
  40.                 // 将当前条目的数据读取到字节数组中
  41.                 byte[] byteArray = getByteArray(zis);
  42.                 process(byteArray, entry);
  43.                 // 关闭当前条目的输入流
  44.                 zis.closeEntry();
  45.             }
  46.             stopWatch.stop();
  47.             log.info("zip处理耗时:{}秒", stopWatch.getTotalTimeSeconds());
  48.         } catch (IOException e) {
  49.             stopWatch.stop();
  50.             log.error("zip处理异常,耗时:{}秒", stopWatch.getTotalTimeSeconds(), e);
  51.         }
  52.     }
  53.     public void streamParallelProcess() {
  54.         StopWatch stopWatch = new StopWatch();
  55.         stopWatch.start();
  56.         try (ZipInputStream zis = new ZipInputStream(Files.newInputStream(path))) {
  57.             ZipEntry entry;
  58.             List<Future<?>> futures = new ArrayList<>();
  59.             while ((entry = zis.getNextEntry()) != null) {
  60.                 // 为了lambda表达式捕获局部变量
  61.                 final ZipEntry currentEntry = entry;
  62.                 // 将当前条目的数据读取到字节数组中
  63.                 byte[] byteArray = getByteArray(zis);
  64.                 Future<?> future = executorService.submit(() -> process(byteArray, currentEntry));
  65.                 futures.add(future);
  66.             }
  67.             // 等待所有任务完成
  68.             for (Future<?> future : futures) {
  69.                 try {
  70.                     future.get();
  71.                 } catch (Exception e) {
  72.                     log.error("任务执行失败", e);
  73.                 }
  74.             }
  75.         } catch (IOException e) {
  76.             log.error("读取ZIP文件异常", e);
  77.         } finally {
  78.             executorService.shutdown();
  79.             try {
  80.                 if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
  81.                     executorService.shutdownNow();
  82.                 }
  83.             } catch (InterruptedException ex) {
  84.                 executorService.shutdownNow();
  85.             }
  86.             stopWatch.stop();
  87.             log.info("zip处理耗时:{}秒", stopWatch.getTotalTimeSeconds());
  88.         }
  89.     }
  90.     private void process(byte[] entryData, ZipEntry entry) {
  91.         try {
  92.             // 在这里处理每个条目的数据
  93.             ObjectMapper objectMapper = new ObjectMapper();
  94.             OriginalObject originalObject = objectMapper.readValue(entryData, OriginalObject.class);
  95.             log.info("完成处理:{},sourceFileId:{}", entry.getName(), originalObject.getSourceFileId());
  96.         } catch (IOException e) {
  97.             log.error("处理条目 {} 异常", entry.getName(), e);
  98.         }
  99.     }
  100.     private byte[] getByteArray(ZipInputStream zis) throws IOException {
  101.         ByteArrayOutputStream baos = new ByteArrayOutputStream();
  102.         byte[] buffer = new byte[1024];
  103.         int length;
  104.         while ((length = zis.read(buffer)) > 0) {
  105.             baos.write(buffer, 0, length);
  106.         }
  107.         return baos.toByteArray();
  108.     }
  109. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

去皮卡多

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表