ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

异步排队导出csv文件

2021-11-11 14:32:56  阅读:170  来源: 互联网

标签:异步 int 导出 zos File new csv public


一、简介。

多人同时导出时进行排队等候,可以自定义排队人数,在大数据情况下可以分批处理数据、导出为异步,导出过程中可获取导出进度,导出完成之后调用下载接口。

可以自定义导出的列。

二、代码实现

1、创建队列,队列是使用的java的Queue

首先先创建队列的类

@Slf4j
@Component
public class Queue {

    /**
     * 缓存导出的队列。数量就是你最大几个请求排队
     */
    public static BlockingQueue<ColumnForm> queue = new ArrayBlockingQueue<ColumnForm>(5);
    /**
     * 导出的线程状态
     */
    private static boolean isRunning = false;
    /**
     * 超时时间间隔
     */
    public static long TIMEOUT_INTERVAL = 3 * 60 * 1000;


    public static boolean export(CmsNewsColumnForm cmsNewsColumnForm) {
        boolean rsp = false;
        try {
            //阻塞3秒
            rsp = queue.offer(cmsNewsColumnForm, 3, TimeUnit.SECONDS);

            if (!rsp) {
                throw new Exception(异常信息);
            }
            if (!isRunning) {
                startup(cmsNewsColumnForm);
                isRunning = true;
            }

        } catch (Exception e) {
            throw new Exception(异常信息);
        }
        return rsp;
    }

    public static void startup(CmsNewsColumnForm cmsNewsColumnForm) {
        if (isRunning) {
            return;
        }

        Thread th = new Thread(() -> {
            long timestamp = System.currentTimeMillis();
            //获取service
            Service service = SpringContextUtils.getBean(Service.class);
            while (isRunning) {
                long timeout = timestamp + TIMEOUT_INTERVAL;
                if (System.currentTimeMillis() > timeout) {
                    //空跑一段时间(3分钟)后线程退出
                    break;
                }
                try {
                    if (queue.size() == 0) {
                        Thread.sleep(1000);
                        continue;
                    }
                    //这里是你导出业务的方法
                    Service.export(columnForm);
                    queue.poll();
                } catch (Exception e) {
                    log.error("导出出错", e);
                }
            }
            isRunning = false;
            log.debug("导出线程停止。");
        });;
        th.start();
    }
}

2、编写实现类service。

 @Autowired
 private ExportThread exportThread;

 public void export(ColumnForm ColumnForm) {
        List<Future> futures = new ArrayList<>();
         //定义文件存放的地址
        File file = FileUtil.mkdir(System.getProperty("user.dir") + File.separator + "upload" + File.separator + ColumnForm.getUserId());
        String filePath = file.getPath() + File.separator;
        
        //这里是你预先设置可能会需要导出的列
        LinkedHashMap<String, String> linkedHashMap = ExportDict.newsMap;
        //把map变成线程安全
        Map<String, String> newsMap = Collections.synchronizedMap(new LinkedHashMap<>());
       
        //需要导出的列,前端传过来的列是驼峰的,这里我们做一个转换,把他转为下划线,方便数据库查询使用。
        Map<String, String> map = new HashMap<>();

        ColumnForm.getColumn().forEach(news -> {
            //驼峰转下划线命名
            String chang = StringUtils.toUnderScoreCase(news);
            map.put(news, chang);
            //构建excel表头
            linkedHashMap.forEach((k, v) -> {
                if (k.equals(news)) {
                    newsMap.put(k, v);
                }
            });
        });
        把转换好的列放进map里面
        ColumnForm.setChang(map);

        //下面是分批次导出

        //查询总共需要导出的总数
        int totalCount = this.count();
        //单个excel最大行数
        int pageSize = CommonConstants.POINTS_DATA_LIMIT;
        //总页数
        int pageCount = (totalCount + pageSize - 1) / pageSize;
        int pageBegin = 0;

        //开始和结束条数
        int pageEnd = pageSize;
         
        int percentage = pageCount;

        //把开始的坐标放入redis,用于获取导出进度使用。这里我用userId作为每个人进度的唯一标识,避免进度串。
        redisTemplate.opsForValue().set(redisKeyConfig.getKey() + CommonRedisKey.EXPORT_NEWS_COUNT + cmsNewsColumnForm.getUserId(), Double.valueOf(pageCount));

        //这里把总页数放进redis,用于后面计算百分比时使用。
        redisTemplate.opsForValue().set(redisKeyConfig.getKey() + CommonRedisKey.PERCENTAGE + cmsNewsColumnForm.getUserId(), percentage);

        //线程池
        ThreadPoolTaskExecutor threadPoolTaskExecutor = threadPoolConfig.threadPoolTaskExecutor();

        //分批多线程查询数据库数据并且写入到本地路径
        for (int i = 0; i < pageCount; i++) {
            final int begin = pageBegin;
            //设置文件名称(name+i下标防止文件名重复)
            final String fileUrl = filePath + "导出信息" + (i + 1) + ".csv";
 
            //开启线程
            Future<?> submit = threadPoolTaskExecutor.submit(new Runnable() {
                @Override
                public void run() {
             
                List<exportVO> exportVOS = Collections.synchronizedList(new ArrayList<>());
                exportVOS =Mapper.export(cmsNewsColumnForm, begin, pageEnd);
         
      
                exportThread.export(exportVOS, newsMap, fileUrl);

                //每次完成之后递减次数,用于计算进度。
                redisTemplate.getConnectionFactory().getConnection().decr(                            redisTemplate.getKeySerializer().serialize(redisKeyConfig.getKey() + "percentage" + cmsNewsColumnForm.getUserId())
                    );
                }
            });
            //返回的结果集存储
            futures.add(submit);

            //分批的页数
            pageBegin = pageEnd + pageBegin;
        }

        //这里必须要阻塞,否则的话当前主线程不会等待所有子线程执行完后在执行,会直接完成,导致获取不到进度。
        futures.forEach(f -> {
            try {
                f.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
                throw new ServiceException(ResponseCode.THREAD_INTERRUPTION_ERROR.msg(), ResponseCode.THREAD_INTERRUPTION_ERROR.code());
            } catch (ExecutionException e) {
                e.printStackTrace();
                throw new ServiceException();
            }
        });
    }

这是sql、这里使用的是mybatis-plus

    <select id="export" resultType="com.puyiyun.cms.entity.vo.ExportVO">
        SELECT
        <foreach collection="ns.chang.entrySet()" index="key" item="value" open="" separator="," close="">
         ${value} as ${key}
        </foreach>
        FROM users
        LIMIT ${pageBegin},${pageEnd}
    </select>

3、写入的工具类


@Component
public class ExportThread {

    /**
     * 新闻导出异步线程
     *
     * @param newsMap           (需要导出的列)
     * @param fileUrl
     */
    public <T> String exportNews(List<T> data, Map<String, String> newsMap, String fileUrl) {
        try {
            ExcelWriter writer = new ExcelWriter();
            //创建一个不重名文件
            writer = ExcelUtil.getWriter(fileUrl);
            //设置列名
            for (Map.Entry<String, String> entry : newsMap.entrySet()) {
                writer.addHeaderAlias(entry.getKey(), entry.getValue());
            }
            //设置是否只显示设置了别名的字段
            writer.setOnlyAlias(true);
            writer.write(data, true);
            writer.close();
            return fileUrl;
        } catch (Exception e) {
            e.printStackTrace();
            throw new ServiceException("导出异常");
        }
    }
}

4、预设置的列。

/**
 * 导出对应字典类
 */
public class ExportDict {
    public static final LinkedHashMap<String, String> newsMap = new LinkedHashMap<>();

    static{
        newsMap.put("userId","用户id");
        newsMap.put("name","用户名称");
        newsMap.put("address","地址");
   
    }
}

5、获取导出进度的方法(这里是每请求一次获取一次最新进度,需要轮询调用,这里不友好,建议改成用socket推送的方式推送给前端)

    public String exportSchedule() {
        //上下文获取userId
        Long userId = SecurityUtils.getUserId();
        //返回保留两位小数
        NumberFormat nf = new DecimalFormat("0.00 ");
        String outcome = "";
        Double onePercentLimit = 0.0;
        //总次数
        Object exportNewsCount = redisTemplate.opsForValue().get(redisKeyConfig.getKey() + CommonRedisKey.EXPORT_NEWS_COUNT + userId);
        //当前第几次
        Object percentage = redisTemplate.opsForValue().get(redisKeyConfig.getKey() + CommonRedisKey.PERCENTAGE + userId);
        if (ObjectUtil.isNotNull(exportCount) && ObjectUtil.isNotNull(percentage)) {
            int perChang = (Integer) percentage;
            double arraySize = (Double) exportNewsCount;
            double Count = Double.valueOf(perChang);
            //计算进度百分比
            onePercentLimit = (1.00 - Count / arraySize) * 100.00;
            //保留两位小数输出
            outcome = nf.format(onePercentLimit);
        }
        return outcome;
    }

6、导出完成后点击下载

   public void downloadNews(HttpServletResponse response) {
        try {
            Long userId = SecurityUtils.getUserId();
            File file = FileUtil.mkdir(System.getProperty("user.dir") + File.separator + "upload" + File.separator + userId);
            String filePath = file.getPath() + File.separator;
            File[] fileUrl = FileUtil.ls(filePath);
            response.setContentType("application/zip");
            response.reset();
            response.setCharacterEncoding("utf-8");
            String fileNameCode = URLEncoder.encode("名称", "UTF-8");
            OutputStream outputStream = response.getOutputStream();
            //文件路径集合
            List<File> fileList = new ArrayList<>();
            //多个文件打成压缩包
            if (!ArrayUtil.isEmpty(fileUrl)) {
                if (fileUrl.length > 1) {
                    response.setHeader("Content-disposition", "attachment;filename=" + fileNameCode + ".zip");
                    for (int i = 0; i < fileUrl.length; i++) {
                        String path = fileUrl[i].getPath();
                        fileList.add(new File(path));
                    }
                    ZipUtils.toZip(fileList, outputStream);
                } else {
                    response.setHeader("Content-disposition", "attachment;filename=" + fileNameCode + ".csv");
                    //单个文件不打包
                    FileInputStream fileInputStream = new FileInputStream(fileUrl[0].getPath());
                    byte[] buff = new byte[1024];
                    int i;
                    while ((i = fileInputStream.read(buff)) != -1) {
                        outputStream.write(buff, 0, i);
                        outputStream.flush();
                    }
                    fileInputStream.close();
                }

                //完成后删除进度
                redisTemplate.delete(redisKeyConfig.getKey() + CommonRedisKey.EXPORT_NEWS_COUNT + userId);
                redisTemplate.delete(redisKeyConfig.getKey() + CommonRedisKey.PERCENTAGE + userId);

                //完成后删除文件
                String parent = fileUrl[0].getParent();
                FileUtil.del(parent);
            }
        } catch (IOException e) {
            log.error(e.getMessage());
            throw new ServiceException("导出失败,请联系管理员", ResponseCode.UNKNOWN.code());
        }
    }

压缩zip的工具类

public class ZipUtils {

    public static void compressionFile(String path, String zipFilePath) {
        File file = new File(path);
        if (file == null || !file.exists() || !file.isDirectory()) {
            return;
        }
        File zipFile = new File(zipFilePath);
        File[] srcFile = file.listFiles();
        byte[] buffer = new byte[1024];
        try {
            ZipOutputStream out = new ZipOutputStream(new FileOutputStream(zipFile));
            for (int i = 0; i < srcFile.length; i++) {
                FileInputStream fileInputStream = new FileInputStream(srcFile[i]);
                out.putNextEntry(new ZipEntry(srcFile[i].getName()));
                int length;
                while ((length = fileInputStream.read(buffer)) > 0) {
                    out.write(buffer, 0, length);
                }
                out.closeEntry();
                fileInputStream.close();
            }
            out.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static final int BUFFER_SIZE = 2 * 1024;

    /**
     * 压缩成ZIP 方法1
     *
     * @param srcDir           压缩文件夹路径
     * @param out              压缩文件输出流
     * @param KeepDirStructure 是否保留原来的目录结构,true:保留目录结构;
     *                         false:所有文件跑到压缩包根目录下(注意:不保留目录结构可能会出现同名文件,会压缩失败)
     * @throws RuntimeException 压缩失败会抛出运行时异常
     */
    public static void toZip(String srcDir, OutputStream out, boolean KeepDirStructure)
            throws RuntimeException {

        long start = System.currentTimeMillis();
        ZipOutputStream zos = null;
        try {
            zos = new ZipOutputStream(out);
            File sourceFile = new File(srcDir);
            compress(sourceFile, zos, sourceFile.getName(), KeepDirStructure);
            long end = System.currentTimeMillis();
            System.out.println("压缩完成,耗时:" + (end - start) + " ms");
        } catch (Exception e) {
            throw new RuntimeException("zip error from ZipUtils", e);
        } finally {
            if (zos != null) {
                try {
                    zos.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }


    /**
     * 压缩成ZIP 方法2
     *
     * @param srcFiles 需要压缩的文件列表
     * @param out      压缩文件输出流
     * @throws RuntimeException 压缩失败会抛出运行时异常
     */
    public static void toZip(List<File> srcFiles, OutputStream out) throws RuntimeException {
        long start = System.currentTimeMillis();
        ZipOutputStream zos = null;
        try {
            zos = new ZipOutputStream(out);
            for (File srcFile : srcFiles) {
                byte[] buf = new byte[BUFFER_SIZE];
                zos.putNextEntry(new ZipEntry(srcFile.getName()));
                int len;
                FileInputStream in = new FileInputStream(srcFile);
                while ((len = in.read(buf)) != -1) {
                    zos.write(buf, 0, len);
                }
                zos.closeEntry();
                in.close();
            }
            long end = System.currentTimeMillis();
            System.out.println("压缩完成,耗时:" + (end - start) + " ms");
        } catch (Exception e) {
            throw new RuntimeException("zip error from ZipUtils", e);
        } finally {
            if (zos != null) {
                try {
                    zos.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }


    /**
     * 递归压缩方法
     *
     * @param sourceFile       源文件
     * @param zos              zip输出流
     * @param name             压缩后的名称
     * @param KeepDirStructure 是否保留原来的目录结构,true:保留目录结构;
     *                         false:所有文件跑到压缩包根目录下(注意:不保留目录结构可能会出现同名文件,会压缩失败)
     * @throws Exception
     */
    private static void compress(File sourceFile, ZipOutputStream zos, String name,
                                 boolean KeepDirStructure) throws Exception {
        byte[] buf = new byte[BUFFER_SIZE];
        if (sourceFile.isFile()) {
            // 向zip输出流中添加一个zip实体,构造器中name为zip实体的文件的名字
            zos.putNextEntry(new ZipEntry(name));
            // copy文件到zip输出流中
            int len;
            FileInputStream in = new FileInputStream(sourceFile);
            while ((len = in.read(buf)) != -1) {
                zos.write(buf, 0, len);
            }
            // Complete the entry
            zos.closeEntry();
            in.close();
        } else {
            File[] listFiles = sourceFile.listFiles();
            if (listFiles == null || listFiles.length == 0) {
                // 需要保留原来的文件结构时,需要对空文件夹进行处理
                if (KeepDirStructure) {
                    // 空文件夹的处理
                    zos.putNextEntry(new ZipEntry(name + "/"));
                    // 没有文件,不需要文件的copy
                    zos.closeEntry();
                }

            } else {
                for (File file : listFiles) {
                    // 判断是否需要保留原来的文件结构
                    if (KeepDirStructure) {
                        // 注意:file.getName()前面需要带上父文件夹的名字加一斜杠,
                        // 不然最后压缩包中就不能保留原来的文件结构,即:所有文件都跑到压缩包根目录下了
                        compress(file, zos, name + "/" + file.getName(), KeepDirStructure);
                    } else {
                        compress(file, zos, file.getName(), KeepDirStructure);
                    }

                }
            }
        }
    }



三、总结

总的步骤来说就是

1、创建队列

2、导出(导出是先导出到本地磁盘)

3、获取导出进度

4、导出完成后进行下载,下载完成后把进度和文件全部删除

这是第一次写博客,写的不好的地方大家见谅。

标签:异步,int,导出,zos,File,new,csv,public
来源: https://blog.csdn.net/Norman1250/article/details/121266317

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有