`
kanpiaoxue
  • 浏览: 1749003 次
  • 性别: Icon_minigender_1
  • 来自: 北京
文章分类
社区版块
存档分类

spring batch(二):核心部分(1):配置Spring batch

 
阅读更多

chapter 3、Batch configuration

1、spring batch 的命名空间

spring xml中指定batch的前缀作为命名空间。

示例:

 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:batch="http://www.springframework.org/schema/batch"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
	http://www.springframework.org/schema/beans/spring-beans.xsd
	http://www.springframework.org/schema/batch
	http://www.springframework.org/schema/batch/spring-batch.xsd">
	
	<batch:job id="importProductsJob">
	(...)
	</batch:job>
</beans>

 

 

当指定batch命名空间之后,bean中的声明都需要加上batch的前缀,例如:<batch:job id="importProductsJob">

spring的命名空间的前缀可以指定任意的名称,这里采用batch作为前缀,为了方便理解。

 

2、spring batch XML的主要标签有:job、step、tasklet、chunk、job-repository

3、Job配置。job元素是整个配置的顶级元素,它的属性有:

a、id

b、restartable

c、incrementer

d、abstract

e、parent

f、job-repository

 

restartable 属性如果是false,则程序不允许重启。如果发生重启,会抛出JobRestartException异常。

 

<batch:job id="importProductsJob" restartable="false">
    (...)
</batch:job>

 job除了这些属性外,还可以配置验证器<batch:validator ref="parameterValidator" />,用来校验工作参数(job parameters),可以实现JobParametersValidator接口。

如果无法通过验证,会抛出JobParametersInvalidException异常。spring batch提供了一个默认的实现类DefaultJobParametersValidator,完成绝大部分的工作。如果还是无法满足需求,可以自己编码实现接口。

实例:

 

<batch:job id="importProductsJob">
	(...)
	<batch:validator ref="validator"/>
</batch:job>
<bean id="validator" class="org.springframework.batch.core.job.DefaultJobParametersValidator">
	<property name="requiredKeys">
		<set>
			<value>date</value>
		</set>
	</property>
	<property name="optionalKeys">
		<set>
			<value>productId</value>
		</set>
	</property>
</bean>

 

4、step步骤的配置。

step的属性:

a、next

b、parent

c、abstract

示例:

 

<job id="importProductsJob">
	<step id="decompress" next="readWrite">
		(...)
	</step>
	<step id="readWrite">
		(...)
	</step>
</job>

 

5、tasklet和chunk的配置。

tasklet和chunk是用来指定处理过程的。

一个tasklet对应于一个事务性的、潜在可重复的过程中发生的一个步骤。

你可以自己实现Tasklet 接口,定义自己的tasklet。这个特性很有用,比如:用于解压文件,执行系统命令,执行存储过程等待。

你也可以使用系统tasklet的属性有:

a、ref指定应用的bean

b、transaction-manager事物管理器

c、start-limittasklet重试retry的次数

d、allow-start-if-complete如果tasklet成功完成之后,是否可以进行重试retry操作。

示例:

 

<batch:job id="importProductsJob">
	(...)
	<batch:step id="readWriteStep">
		<batch:tasklet
			transaction-manager="transactionManager"
			start-limit="3"
			allow-start-if-complete="true">
			(...)
		</batch:tasklet>
	</batch:step>
</batch:job>
<bean id="transactionManager" class="(...)">
	(...)
</bean>

 

chunk ,ChunkOrientedTasklet 类实现类“块处理(chunk processing)”。

配置tasklet很简单,但是配置“块处理”就会复杂一点,因为它涉及的内容更多。

chunk的属性有:

a、reader

b、processor

c、writer

d、commit-interval事物提交一次处理的items的数量。也是chunk的大小。

e、skip-limit跳跃的次数

f、skip-policy跳跃的策略:要实现SkipPolicy接口

g、retry-policy重试的策略:要实现RetryPolicy接口

h、retry-limit最大的重试次数

i、cache-capacity重试的缓存策略

j、reader-transactional-queue从一个拥有事务的JMS的queue读取item数据

k、processor-transactional处理器是否包含事务处理

l、chunk-completion-policychunk的完成策略

示例:

 

<batch:job id="importProductsJob">
	(...)
	<batch:step id="readWrite">
		<batch:tasklet>
			<batch:chunk
				reader="productItemReader"
				processor="productItemProcessor"
				writer="productItemWriter"
				commit-interval="100" 
				skip-limit="20"
				retry-limit="3"
				cache-capacity="100"
				chunk-completion-policy="timeoutCompletionPolicy"/>
		</batch:tasklet>
	</batch:step>
</batch:job>
<bean id="productItemReader" class="(...)">
(...)
</bean>
<bean id="productItemProcessor" class="(...)">
(...)
</bean>
<bean id="productItemWriter" class="(...)">
(...)
</bean>
<bean id="timeoutCompletionPolicy"
	class="org.springframework.batch.repeat.policy.TimeoutTerminationPolicy">
		<constructor-arg value="60"/>
</bean>

 

chunk还有几个子标签,包括:reader、processor、writer、skip-policy、retry-policy

示例:

 

<batch:job id="importProductsJob">
	(...)
	<batch:step id="readWrite">
		<batch:tasklet>
			<batch:chunk commit-interval="100">
				<batch:reader>
					<bean class="(...)">
						(...)
					</bean>
				</batch:reader>
				<batch:processor>
					<bean class="(...)">
						(...)
					</bean>
				</batch:processor>
				<batch:writer>
					<bean class="(...)">
						(...)
					</bean>
				</batch:writer>
			</batch:chunk>
		</batch:tasklet>
	</batch:step>
</batch:job>

 

chunk的一些其他额外的子标签:retry-listeners、skippable-exception-classes、retryable-exception-classes、streams

示例:

 

<batch:job id="importProductsJob">
	(...)
	<batch:step id="readWrite">
		<batch:tasklet>
			<batch:chunk commit-interval="100"
				skip-limit="10">
				<skippable-exception-classes>
					<include class="org.springframework.batch.item.file.FlatFileParseException"/>
					<exclude class="java.io.FileNotFoundException"/>
				</skippable-exception-classes>
			</batch:chunk>
		</batch:tasklet>
	</batch:step>
</batch:job>

 

 

在chunk里面配置streams

示例:

 

<batch:job id="importProductsJob">
	(...)
	<batch:step id="readWrite">
			<batch:tasklet>
				<batch:chunk reader="productItemReader" writer="compositeWriter"/>
				<streams>
					<stream ref="productItemWriter1"/>
					<stream ref="productItemWriter2"/>
				</streams>
			</batch:tasklet>
	</batch:step>
</batch:job>

<bean id="compositeWriter"
	class="org.springframework.batch.item.support.CompositeItemWriter">
	<property name="delegates">
		<list>
			<ref bean="productItemWriter1"/>
			<ref bean="productItemWriter2"/>
		</list>
	</property>
</bean>

 

使用一个复合的stream形式,如上例的itemWriter,内部是writer1和writer2是stream的,需要在chunk里面的streams元素里面注册。

 

6、事务配置。

    事务是spring batch的一个重要主题。主要作用于batch处理程序的健壮性,chunk处理的合并来完成它的工作。因为事务调用的对象类型不同,你可以在不同的层次配置事务。

示例:

<batch:job id="importProductsJob">
	(...)
	<batch:step id="readWrite">
		<batch:tasklet transaction-manager="transactionManager" (...)>
			(...)
		</batch:tasklet>
	</batch:step>
</batch:job>

 事务,有几个定义的属性:行为,隔离 isolation,超时 timeout。

spring batch提供transaction-attributes标签来描述这些事务的属性。

 示例:

<batch:tasklet>
	<batch:chunk reader="productItemReader"
		writer="productItemReader"
		commit-interval="100"/>
		<batch:transaction-attributes isolation="DEFAULT"
		propagation="REQUIRED"
		timeout="30"/>
	</batch:chunk>
</batch:tasklet>

 isolation 定义数据库的隔离级别以及对外部事务的可见性。

spring的事务处理是基于java的异常体系的。java里面的异常分为两类:检查型异常(extends Exception)和非检查型异常(extends RuntimeException)

spring对检查型异常进行commit提交处理,对非检查型异常进行rollback回滚处理。

spring batch运行你对不需要触发rollback回滚动作的异常进行定义。你可以在tasklet的no-rollback-exception-class元素中进行指定。

 示例:

<batch:tasklet>
	(...)
	<batch:no-rollback-exception-classes>
		<batch:include
			class="org.springframework.batch.item.validator.ValidationException"/>
	</batch:no-rollback-exception-classes>
</batch:tasklet>

 对于JMS,spring batch提供chunk的reader-transactional-queue 属性,提供事务处理。

 

7、配置job仓库

job仓库(job repository)是spring batch底层基础的一个关键特征,它为spring batch的运行提供信息。

job仓库必须实现JobRepository接口,spring batch只提供了一个具体的实现类:SimpleJobRepository。

spring batch为DAO提供了两种实现:

    a、内存无持久化

    b、jdbc元数据的持久化

第一种“内存无持久化”的方式可以用来spring batch的测试和开发,但是不能应用于生产环境。因为内存中的东西会丢失。

设定job仓库的参数

    (1)、配置内存模式的job仓库:Configuring an in-memory job repository

示例:

<bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
	<property name="transactionManager-ref" ref="transactionManager"/>
</bean>

<bean id="transactionManager"	class="org.springframework.batch.support.transaction.ResourcelessTransactionManager"/>

<batch:job id="importInvoicesJob"	job-repository="jobRepository">
	(...)
</batch:job>

 由于job仓库是内存模式的,所以事务管理器采用ResourcelessTransactionManager。 这个类是NOOP (NO OPeration)无操作的PlatformTransactionManager接口实现。

     (2)、持久化的job仓库:Configuring a persistent job repository

关系型数据库的job仓库的属性如下:data-source,transaction-manager,isolation-level-for-create,max-varchar-length,table-prefix,lob-handler

示例:

<bean id="dataSource"
	class="org.apache.commons.dbcp.BasicDataSource"
	destroy-method="close">
	<property name="driverClassName" value="${batch.jdbc.driver}" />
	<property name="url" value="${batch.jdbc.url}" />
	<property name="username" value="${batch.jdbc.user}" />
	<property name="password" value="${batch.jdbc.password}" />
</bean>

<bean id="transactionManager" lazy-init="true"
	class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
	<property name="dataSource" ref="dataSource" />
</bean>

<batch:job-repository id="jobRepository"
	data-source="dataSource"
	transaction-manager="transactionManager"
	isolation-level-for-create="SERIALIZABLE"
	table-prefix="BATCH_"
/>

<batch:job id="importInvoicesJob" job-repository="jobRepository">
	(...)
</batch:job>

 [问题:如果在不同的物理节点上面运行同样的job会发生什么呢?]

前提:使用同一份spring batch的元数据,即采用同一份数据库上面的表结构。

当创建job实例和执行信息的元数据的时候,job仓库扮演了一个集中维护的工作库的角色。当job并发运行的时候,spring batch 的job仓库,可以防止创建相同的工作情况。这个功能依赖于底层数据库的事务能力,完成job的同步任务。我们设置job仓库的属性isolation-level-for-create="SERIALIZABLE",来避免发生job的并发问题。由于有这种防护措施,你可以把你的spring batch的job分布到各个不同的物理节点,保证你的job实例不会被创建两次。

     job仓库是spring batch底层结构中的重要部分,它记录了batch处理的信息,并且跟踪job运行的成功与失败。

 

 8、spring batch配置的高级主题。

(1)、使用step作用域。当使用SpEL的时候,step作用域很有用,来实现属性的晚绑定。

(2)、Spring表达式语言:Spring Expression Language (SpEL) 

Spring3.× 开始提供。

 step的作用的实例范围包括:jobParameters、jobExecutionContext、stepExecutionContext

 示例:

 

<bean id="decompressTasklet"
			class="com.manning.sbia.ch01.batch.DecompressTasklet"
			scope="step">
	<property name="inputResource"
	value="#{jobParameters['inputResource']}" />
	<property name="targetDirectory"
	value="#{jobParameters['targetDirectory']}" />
	<property name="targetFile"
	value="#{jobParameters['targetFile']}" />
</bean>

 SpEL由 #{ 和 } 组成

 

 在jobExecutionContext 和 stepExecutionContext也可以应用SpEL表达式。

 (3)、使用Linteners提供的更多的处理。

Spring batch在job和step级别上面提供listener。

Spring batch提供的listener的类型:

a、Job listener:在job级别监听处理过程

b、Step listeners:在step级别监听处理过程

c、Item listeners:监听item的retry重试和repeat重做动作

一、Job listener 可以监听job的运行,在job运行前和后添加动作。可以利用 listener标签,在job标签下面作为子元素进行添加。

示例1:

 

<batch:job id="importProductsJob">
	<batch:listeners>
	<batch:listener ref="importProductsJobListener"/>
	</batch:listeners>
</batch:job>

 importProductsJobListener不管job运行成功还是失败,它都会在job运行的开始和结束的时候,接收job的通知,进行监听。

 

 

还可以通过普通的POJO的java对象来做监听器,只需要进行简单的配置即可。

示例2:

 

<batch:listeners>
    <batch:listener ref="importProductsJobListener" after-job-method="afterJob" before-job-method="beforeJob"/>
</batch:listeners>

 可以在listener里面的ref指定引用的POJO的bean,通过after-job-method="afterJob" before-job-method="beforeJob" 来指定job之前和之后的执行方法。不过,被指定的这两个方法的参数都需要是:JobExecution jobExecution。 这个2个方法的返回值都是void

 

还有一种方法是利用“注释”来配置listener,spring batch会自己发现并运行该类。

 

二、Step listener

    Step有一系列的listener来跟踪step的处理过程。这里所有的listener接口都继承了StepListener接口。

Spring batch提供的Step的listener有:

a、ChunkListener:在chunk执行的之前和之后调用。

b、ItemProcessListener:在 ItemProcessor得到一个item之前和之后调用,在ItemProcessor抛出一个异常的时候调用。

c、ItemReadListener:在读取item之前和读取item之后调用,或者在读取item的过程中触发异常的时候调用。

d、ItemWriteListener:在一个item输出之前和之后调用,或者在item输出的过程中调用。

e、SkipListener:当读取、处理和输出的过程中产生了skip跳跃一个item的时候调用。

f、StepExecutionListener:在step运行之前和之后调用。

 接口代码:

public interface StepExecutionListener extends StepListener {
	void beforeStep(StepExecution stepExecution);
	ExitStatus afterStep(StepExecution stepExecution);
}

public interface ChunkListener extends StepListener {
	void beforeChunk();
	void afterChunk();
}


public interface ItemProcessListener<T, S> extends StepListener {
	void beforeProcess(T item);
	void afterProcess(T item, S result);
	void onProcessError(T item, Exception e);
}

public interface ItemReadListener<T> extends StepListener {
	void beforeRead();
	void afterRead(T item);
	void onReadError(Exception ex);
}

public interface ItemWriteListener<S> extends StepListener {
	void beforeWrite(List<? extends S> items);
	void afterWrite(List<? extends S> items);
	void onWriteError(Exception exception, List<? extends S> items);
}

public interface SkipListener<T,S> extends StepListener {
	void onSkipInRead(Throwable t);
	void onSkipInProcess(T item, Throwable t);
	void onSkipInWrite(S item, Throwable t);
}

 Step listener 作为tasklet标签的一个子标签进行配置。

 上面这些所有的Step listener都可以作为tasklet标签的子标签以相同的方式和等级进行配置。

示例:

	<bean id="importProductsJobListener"
		class="test.case01.java.batch.listener.job.ImportProductsJobListener" />
		
	<bean id="productStepExecutionListener"
		class="test.case01.java.batch.listener.step.ProductStepExecutionListener" />

	<bean id="productChunkListener"
		class="test.case01.java.batch.listener.step.chunk.ProductChunkListener" />
		
	<bean id="productItemProcessListener"
		class="test.case01.java.batch.listener.step.chunk.item.ProductItemProcessListener" />
		
	<batch:job id="importProducts" restartable="false">
		<batch:step id="readWriteProducts">
			<batch:tasklet>
				<batch:chunk reader="reader" writer="writer" processor="processor"
					commit-interval="100" skip-limit="5">
					<batch:skippable-exception-classes>
						<batch:include
							class="org.springframework.batch.item.file.FlatFileParseException" />
					</batch:skippable-exception-classes>
				</batch:chunk>
				<batch:listeners>
					<!-- here configure three kinds listeners for StepExecutionListener ,  ChunkListener , ItemProcessListener  for example.-->
					<batch:listener ref="productStepExecutionListener" />
					<batch:listener ref="productChunkListener" />
					<batch:listener ref="productItemProcessListener" />
				</batch:listeners>
			</batch:tasklet>
		</batch:step>
		<batch:validator ref="parameterValidator" />
		<batch:listeners>
			<batch:listener ref="importProductsJobListener" />
		</batch:listeners>
	</batch:job>

 

 三、retry重试和repeat重做的listener

repeat listener拥有的方法名称:before、after、close(在一个item最后一次repeat重做之后调用,不管repeat成功与否)、onError、open

retry listener拥有的方法名称:close(在一个item上面最后一次尝试retry之后调用,不管retry成功与否)、onError、open

接口代码:

public interface RepeatListener {
	void before(RepeatContext context);
	void after(RepeatContext context, RepeatStatus result);
	void open(RepeatContext context);
	void onError(RepeatContext context, Throwable e);
	void close(RepeatContext context);
}

public interface RetryListener {
	<T> void open(RetryContext context, RetryCallback<T> callback);
	<T> void onError(RetryContext context,
	RetryCallback<T> callback, Throwable e);
	<T> void close(RetryContext context,
	RetryCallback<T> callback, Throwable e);
}

 这2个接口,和上面的step listener的配置位置和方式一致,都是tasklet标签的子标签位置

 

四、配置继承关系

spring的XML提供配置的继承。使用abstract和parent两个属性。从一个parent的bean继承,表示该bean可以利用parent bean中的所有的属性,并且可以覆盖这些属性。

示例:

<bean id="parentBean" abstract="true">
	<property name="propertyOne" value="(...)"/>
</bean>

<bean id="childBean" parent="parentBean">
	<property name="propertyOne" value="(...)"/>
	<property name="propertyTwo" value="(...)"/>
</bean>

 这种继承关系是由spring提供的,在spring batch里面也可以使用。

listeners标签,提供merge属性,可以用来合并parent和自身的listener

 示例:

<job id="parentJob" abstract="true">
	<listeners>
		<listener ref="globalListener"/>
	<listeners>
</job>

<job id="importProductsJob" parent="parentJob">
	(...)
	<listeners merge="true">
		<listener ref="specificListener"/>
	<listeners>
</job>

 spring XML的这种继承关系,使得spring batch的XML配置更简单。

 

 

分享到:
评论
2 楼 hvang1988 2016-04-11  
理解错了,原来还是计数,不是时间
1 楼 hvang1988 2016-04-11  
TimeoutTerminationPolicy 是毫秒 默认30000L 就是30秒

相关推荐

    The Definitive Guide to Spring Batch, 2nd Edition.epub

    Work with all aspects of batch processing in a modern Java environment using a selection of Spring frameworks. This book provides up-to-date examples using the latest configuration techniques based on...

    bank-spring-batch:具有多处理器的Spring Batch项目

    bank-spring-batch:具有多处理器的Spring Batch项目

    SpringBatch+Spring+Mybatis+MySql (spring batch 使用jar)

    Spring Batch是一个轻量级的,完全面向Spring的批处理框架,可以应用于企业级大量的数据处理系统。Spring Batch以POJO和大家熟知的Spring框架为基础,使开发者更容易的访问和利用企业级服务。Spring Batch可以提供...

    Spring.Batch批处理框架

    Spring Batch 是一个轻量级的、完善的批处理框架,旨在帮助企业建立健壮、高效的批处理应用。Spring Batch是Spring的一个子项目,使用Java语言并基于Spring框架为基础开发,使得已经使用 Spring 框架的开发者或者企业更...

    spring-batch包

    spring-batch4.0.0 batch spring-batch集成 spring-batch.jar

    Spring Boot整合Spring Batch,实现批处理

    Spring Boot整合Spring Batch的一个小例子,在网上发现这方面的资源比较少,特此将其上传供大家学习。

    Spring Batch API(Spring Batch 开发文档).CHM

    Spring Batch API(Spring Batch 开发文档).CHM。 官网 Spring Batch API,Spring Batch 开发文档

    Spring Batch in Action英文pdf版

    Spring Batch in Action英文pdf版,最新Spring教科书

    SpringBoot+Batch实现

    spring batch3.x中文文档:http://www.kailing.pub/SpringBatchReference spring batch官方入门实例:https://projects.spring.io/spring-batch/ 简单的任务操作实现,当前项目只是一个demo,我也是初学,目前启动...

    springbatch 详解PDF附加 全书源码 压缩包

    spring batch批处理框架和对应的源码资源 rar 可以直接运行的

    SpringBatch-DataMigration SpringBatch数据迁移项目

    mybatis、springBatch、mysql、quartz、spring、springMVC 部署说明: 本项目为两个数据库,由一个数据库的表向另外一个数据库的表做数据迁移,其中数据库脚本在:/src/main/resources/sql/下面(其中data_rep中的表...

    Spring Batch批处理框架

    Spring Batch批处理框架Spring Batch批处理框架Spring Batch批处理框架

    SpringBatch批处理 刘相编

    基本篇重点讲述了数据批处理的核心概念、典型的作业配置、作业步配置,以及Spring Batch框架中经典的三步走策略:数据读、数据处理和数据写,详尽地介绍了如何对CVS格式文件、JSON格式文件、XML文件、数据库和JMS...

    基于Spring Batch的大数据量并行处理

    基于Spring Batch的大数据量并行处理 基于Spring Batch的大数据量并行处理

    spring batch in action

    Spring Batch in Action is a comprehensive, in-depth guide to writing batch applications using Spring Batch. Written for developers who have basic knowledge of Java and the Spring lightweight ...

    springBoot+springBatch批量处理数据demo

    最近在研究springBoot+springbatch ,按照官网的实例做了一个实例。 最近在研究springBoot+springbatch ,按照官网的实例做了一个实例。

    SpringBatch批处理框架

    资源名称:Spring Batch 批处理框架内容简介:《Spring Batch 批处理框架》全面、系统地介绍了批处理框架Spring Batch,通过详尽的实战示例向读者展示了Spring Batch框架对大数据批处理的基本开发能力,并对框架的...

    spring batch批处理 教程

    二,Spring Batch结构 4 1,Spring Batch体系结构 4 2,Spring Batch主要对象 5 三,Spring Batch流程介绍 5 四,Spring Batch之Step执行过程介绍 6 五,Spring Batch应用 7 1,简单应用 7  构建应用 7  对象...

    SpringBatch数据库建表语句.txt

    SpringBatch数据库建表语句,存储springBatch批处理过程中需要保存的数据和步骤信息

Global site tag (gtag.js) - Google Analytics