ICode9

精准搜索请尝试: 精确搜索
首页 > 数据库> 文章详细

SpringBatch批处理框架+mysql仓库+web监控实录

2020-02-04 20:00:12  阅读:224  来源: 互联网

标签:web 定义 批处理 数据源 item step ItemProcessor mysql SpringBatch


1、概念

Spring Batch 是一款轻量级地适合企业级应用的批处理框架,值得注意的是,不同于其他调度框架,Spring Batch不提供调度功能。

2、批处理过程

批处理可以分为以下几个步骤:

  1. 读取数据
  2. 按照业务处理数据
  3. 归档数据的过程

3、Spring Batch给我们提供了什么?

  1. 统一的读写接口
  2. 丰富的任务处理方式
  3. 灵活的事务管理及并发处理
  4. 日志、监控、任务重启与跳过等特性

4、基础组件

名称 用途
JobRepository 用于注册和存储Job的容器
JobLauncher 用于启动Job
Job 实际要执行的作业,包含一个或多个step
step 步骤,批处理的步骤一般包含ItemReader, ItemProcessor, ItemWriter
ItemReader 从给定的数据源读取item
ItemProcessor 在item写入数据源之前进行数据整理
ItemWriter 把Chunk中包含的item写入数据源。
Chunk 数据块,给定数量的item集合,让item进行多次读和处理,当满足一定数量的时候再一次写入。
TaskLet 子任务表, step的一个事务过程,包含重复执行,同步/异步规则等。

5、job, step, tasklet 和 chunk 关系

一个job对应至少一个step,一个step对应0或者1个TaskLet,一个taskLet对应0或者1个Chunk

SpringBatch批处理框架+mysql仓库+web监控实录

6、实战:批处理excel插入数据库

6.1:定义数据仓库

  <!-- 内存仓库  -->
    <!--<bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"/>-->

    <!-- 数据库仓库  -->
    <batch:job-repository id="jobRepository" data-source="dataRepDruidDataSource"
                          isolation-level-for-create="SERIALIZABLE" transaction-manager="transactionManager"
                          table-prefix="BATCH_" max-varchar-length="1000" />

6.2:定义启动器

    <!-- 作业调度器,用来启动job,引用作业仓库 -->
    <bean id="jobLauncher"
          class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
        <property name="jobRepository" ref="jobRepository"/>
    </bean>

6.3:定义JOB

    <batch:job id="userBatchJobName" restartable="true">
        <batch:step id="userStep">
            <batch:tasklet allow-start-if-complete="false"
                           start-limit="1" task-executor="taskExecutor" throttle-limit="5">
                <batch:chunk reader="userReader" writer="userWriter"
                             processor="userProcessor" commit-interval="5" retry-limit="10">
                    <batch:retryable-exception-classes>
                        <batch:include class="org.springframework.dao.DuplicateKeyException"/>
                        <batch:include class="java.sql.BatchUpdateException"/>
                        <batch:include class="java.sql.SQLException"/>
                    </batch:retryable-exception-classes>
                </batch:chunk>
            </batch:tasklet>
        </batch:step>
    </batch:job>

    <bean id="taskExecutor"
          class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <!-- 线程池维护线程的最少数量 -->
        <property name="corePoolSize" value="100"/>
        <!-- 线程池维护线程所允许的空闲时间 -->
        <property name="keepAliveSeconds" value="30000"/>
        <!-- 线程池维护线程的最大数量 -->
        <property name="maxPoolSize" value="300"/>
        <!-- 线程池所使用的缓冲队列 -->
        <property name="queueCapacity" value="100"/>
    </bean>

6.4:定义ItemReader

     <bean id="userReader" class="org.springframework.batch.item.file.FlatFileItemReader">
        <property name="lineMapper" ref="lineMapper"/>
        <property name="resource" value="classpath:message/batch-data-source.csv"/>
    </bean>
 <!-- 将每行映射成对象 -->
    <bean id="lineMapper" class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
        <property name="lineTokenizer">
            <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
                <property name="delimiter" value=","/><!-- 根据某种分隔符分割 -->
                <property name="names" value="id,name" />
            </bean>
        </property>
        <property name="fieldSetMapper"><!-- 将拆分后的字段映射成对象 -->
            <bean class="com.hcw.core.batch.UserFieldSetMapper" />
        </property>
    </bean>

6.5:定义ItemWriter

     <bean id="userWriter" class="com.hcw.core.batch.MyBatchItemWriter" scope="step">
        <property name="statementId" value="com.hcw.core.batch.dao.UserToMapper.batchInsert"/>
        <property name="sqlSessionFactory" ref="sqlSessionFactoryTo"/>
    </bean>

6.6:定义ItemProcessor

    <bean id="userProcessor" class="com.hcw.core.batch.UserItemProcessor"/>

6.7: 定义jobRepository的数据源

   <bean id="dataRepDruidDataSource" class="com.alibaba.druid.pool.DruidDataSource"
          init-method="init" destroy-method="close">
        <property name="url" value="${jdbc.mysql.rep.connection.url}" />
        <property name="username" value="${jdbc.mysql.rep.connection.username}" />
        <property name="password" value="${jdbc.mysql.rep.connection.password}" />
        <property name="filters" value="${jdbc.mysql.rep.connection.filters}" />
        <property name="maxActive" value="${jdbc.mysql.rep.connection.maxActive}" />
        <property name="initialSize" value="${jdbc.mysql.rep.connection.initialSize}" />
        <property name="maxWait" value="${jdbc.mysql.rep.connection.maxWait}" />
        <property name="minIdle" value="${jdbc.mysql.rep.connection.minIdle}" />
        <property name="timeBetweenEvictionRunsMillis"
                  value="${jdbc.mysql.rep.connection.timeBetweenEvictionRunsMillis}" />
        <property name="minEvictableIdleTimeMillis"
                  value="${jdbc.mysql.rep.connection.minEvictableIdleTimeMillis}" />
        <property name="validationQuery"
                  value="${jdbc.mysql.rep.connection.validationQuery}" />
        <property name="testWhileIdle"
                  value="${jdbc.mysql.rep.connection.testWhileIdle}" />
        <property name="testOnBorrow" value="${jdbc.mysql.rep.connection.testOnBorrow}" />
        <property name="testOnReturn" value="${jdbc.mysql.rep.connection.testOnReturn}" />
        <property name="poolPreparedStatements"
                  value="${jdbc.mysql.rep.connection.poolPreparedStatements}" />
        <property name="maxPoolPreparedStatementPerConnectionSize"
                  value="${jdbc.mysql.rep.connection.maxPoolPreparedStatementPerConnectionSize}" />
    </bean>

6.8: 启动JOB

启动tomcat,打开启动页面

SpringBatch批处理框架+mysql仓库+web监控实录

标签:web,定义,批处理,数据源,item,step,ItemProcessor,mysql,SpringBatch
来源: https://blog.51cto.com/14570694/2469184

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有