马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
概述
流式处置惩罚一个zip,zip里有多个json文件。
流式处置惩罚可以制止解压一个大的zip。再加上多线程,处置惩罚的效率杠杠的。
代码
- package 多线程.demo05多jsonCountDownLatch;
- import com.fasterxml.jackson.databind.ObjectMapper;
- import lombok.SneakyThrows;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.util.StopWatch;
- import java.io.ByteArrayOutputStream;
- import java.io.FileNotFoundException;
- import java.io.IOException;
- import java.nio.file.Files;
- import java.nio.file.Path;
- import java.nio.file.Paths;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.Future;
- import java.util.concurrent.TimeUnit;
- import java.util.zip.ZipEntry;
- import java.util.zip.ZipInputStream;
- @Slf4j
- public class ZipProcessor {
- private Path path;
- private static int numThreads = Runtime.getRuntime().availableProcessors();
- private static ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
- private static ObjectMapper objectMapper = new ObjectMapper();
- @SneakyThrows
- public ZipProcessor(String filePath){
- path = Paths.get(filePath);
- if (!Files.exists(path)) {
- throw new FileNotFoundException("The specified ZIP file does not exist: " + filePath);
- }
- }
- public void streamProcess(){
- StopWatch stopWatch = new StopWatch();
- stopWatch.start();
- // 使用 try-with-resources 保证资源关闭
- try (ZipInputStream zis = new ZipInputStream(Files.newInputStream(path))) {
- ZipEntry entry;
- while ((entry = zis.getNextEntry()) != null) {
- // 将当前条目的数据读取到字节数组中
- byte[] byteArray = getByteArray(zis);
- process(byteArray, entry);
- // 关闭当前条目的输入流
- zis.closeEntry();
- }
- stopWatch.stop();
- log.info("zip处理耗时:{}秒", stopWatch.getTotalTimeSeconds());
- } catch (IOException e) {
- stopWatch.stop();
- log.error("zip处理异常,耗时:{}秒", stopWatch.getTotalTimeSeconds(), e);
- }
- }
- public void streamParallelProcess() {
- StopWatch stopWatch = new StopWatch();
- stopWatch.start();
- try (ZipInputStream zis = new ZipInputStream(Files.newInputStream(path))) {
- ZipEntry entry;
- List<Future<?>> futures = new ArrayList<>();
- while ((entry = zis.getNextEntry()) != null) {
- // 为了lambda表达式捕获局部变量
- final ZipEntry currentEntry = entry;
- // 将当前条目的数据读取到字节数组中
- byte[] byteArray = getByteArray(zis);
- Future<?> future = executorService.submit(() -> process(byteArray, currentEntry));
- futures.add(future);
- }
- // 等待所有任务完成
- for (Future<?> future : futures) {
- try {
- future.get();
- } catch (Exception e) {
- log.error("任务执行失败", e);
- }
- }
- } catch (IOException e) {
- log.error("读取ZIP文件异常", e);
- } finally {
- executorService.shutdown();
- try {
- if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
- executorService.shutdownNow();
- }
- } catch (InterruptedException ex) {
- executorService.shutdownNow();
- }
- stopWatch.stop();
- log.info("zip处理耗时:{}秒", stopWatch.getTotalTimeSeconds());
- }
- }
- private void process(byte[] entryData, ZipEntry entry) {
- try {
- // 在这里处理每个条目的数据
- ObjectMapper objectMapper = new ObjectMapper();
- OriginalObject originalObject = objectMapper.readValue(entryData, OriginalObject.class);
- log.info("完成处理:{},sourceFileId:{}", entry.getName(), originalObject.getSourceFileId());
- } catch (IOException e) {
- log.error("处理条目 {} 异常", entry.getName(), e);
- }
- }
- private byte[] getByteArray(ZipInputStream zis) throws IOException {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- byte[] buffer = new byte[1024];
- int length;
- while ((length = zis.read(buffer)) > 0) {
- baos.write(buffer, 0, length);
- }
- return baos.toByteArray();
- }
- }
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |