`
kanpiaoxue
  • 浏览: 1744917 次
  • 性别: Icon_minigender_1
  • 来自: 北京
文章分类
社区版块
存档分类
最新评论

spring batch(二):核心部分(1):配置Spring batch

 
阅读更多

chapter 3、Batch configuration

1、spring batch 的命名空间

spring xml中指定batch的前缀作为命名空间。

示例:

 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:batch="http://www.springframework.org/schema/batch"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
	http://www.springframework.org/schema/beans/spring-beans.xsd
	http://www.springframework.org/schema/batch
	http://www.springframework.org/schema/batch/spring-batch.xsd">
	
	<batch:job id="importProductsJob">
	(...)
	</batch:job>
</beans>

 

 

当指定batch命名空间之后,bean中的声明都需要加上batch的前缀,例如:<batch:job id="importProductsJob">

spring的命名空间的前缀可以指定任意的名称,这里采用batch作为前缀,为了方便理解。

 

2、spring batch XML的主要标签有:job、step、tasklet、chunk、job-repository

3、Job配置。job元素是整个配置的顶级元素,它的属性有:

a、id

b、restartable

c、incrementer

d、abstract

e、parent

f、job-repository

 

restartable 属性如果是false,则程序不允许重启。如果发生重启,会抛出JobRestartException异常。

 

<batch:job id="importProductsJob" restartable="false">
    (...)
</batch:job>

 job除了这些属性外,还可以配置验证器<batch:validator ref="parameterValidator" />,用来校验工作参数(job parameters),可以实现JobParametersValidator接口。

如果无法通过验证,会抛出JobParametersInvalidException异常。spring batch提供了一个默认的实现类DefaultJobParametersValidator,完成绝大部分的工作。如果还是无法满足需求,可以自己编码实现接口。

实例:

 

<batch:job id="importProductsJob">
	(...)
	<batch:validator ref="validator"/>
</batch:job>
<bean id="validator" class="org.springframework.batch.core.job.DefaultJobParametersValidator">
	<property name="requiredKeys">
		<set>
			<value>date</value>
		</set>
	</property>
	<property name="optionalKeys">
		<set>
			<value>productId</value>
		</set>
	</property>
</bean>

 

4、step步骤的配置。

step的属性:

a、next

b、parent

c、abstract

示例:

 

<job id="importProductsJob">
	<step id="decompress" next="readWrite">
		(...)
	</step>
	<step id="readWrite">
		(...)
	</step>
</job>

 

5、tasklet和chunk的配置。

tasklet和chunk是用来指定处理过程的。

一个tasklet对应于一个事务性的、潜在可重复的过程中发生的一个步骤。

你可以自己实现Tasklet 接口,定义自己的tasklet。这个特性很有用,比如:用于解压文件,执行系统命令,执行存储过程等待。

你也可以使用系统tasklet的属性有:

a、ref指定应用的bean

b、transaction-manager事物管理器

c、start-limittasklet重试retry的次数

d、allow-start-if-complete如果tasklet成功完成之后,是否可以进行重试retry操作。

示例:

 

<batch:job id="importProductsJob">
	(...)
	<batch:step id="readWriteStep">
		<batch:tasklet
			transaction-manager="transactionManager"
			start-limit="3"
			allow-start-if-complete="true">
			(...)
		</batch:tasklet>
	</batch:step>
</batch:job>
<bean id="transactionManager" class="(...)">
	(...)
</bean>

 

chunk ,ChunkOrientedTasklet 类实现类“块处理(chunk processing)”。

配置tasklet很简单,但是配置“块处理”就会复杂一点,因为它涉及的内容更多。

chunk的属性有:

a、reader

b、processor

c、writer

d、commit-interval事物提交一次处理的items的数量。也是chunk的大小。

e、skip-limit跳跃的次数

f、skip-policy跳跃的策略:要实现SkipPolicy接口

g、retry-policy重试的策略:要实现RetryPolicy接口

h、retry-limit最大的重试次数

i、cache-capacity重试的缓存策略

j、reader-transactional-queue从一个拥有事务的JMS的queue读取item数据

k、processor-transactional处理器是否包含事务处理

l、chunk-completion-policychunk的完成策略

示例:

 

<batch:job id="importProductsJob">
	(...)
	<batch:step id="readWrite">
		<batch:tasklet>
			<batch:chunk
				reader="productItemReader"
				processor="productItemProcessor"
				writer="productItemWriter"
				commit-interval="100" 
				skip-limit="20"
				retry-limit="3"
				cache-capacity="100"
				chunk-completion-policy="timeoutCompletionPolicy"/>
		</batch:tasklet>
	</batch:step>
</batch:job>
<bean id="productItemReader" class="(...)">
(...)
</bean>
<bean id="productItemProcessor" class="(...)">
(...)
</bean>
<bean id="productItemWriter" class="(...)">
(...)
</bean>
<bean id="timeoutCompletionPolicy"
	class="org.springframework.batch.repeat.policy.TimeoutTerminationPolicy">
		<constructor-arg value="60"/>
</bean>

 

chunk还有几个子标签,包括:reader、processor、writer、skip-policy、retry-policy

示例:

 

<batch:job id="importProductsJob">
	(...)
	<batch:step id="readWrite">
		<batch:tasklet>
			<batch:chunk commit-interval="100">
				<batch:reader>
					<bean class="(...)">
						(...)
					</bean>
				</batch:reader>
				<batch:processor>
					<bean class="(...)">
						(...)
					</bean>
				</batch:processor>
				<batch:writer>
					<bean class="(...)">
						(...)
					</bean>
				</batch:writer>
			</batch:chunk>
		</batch:tasklet>
	</batch:step>
</batch:job>

 

chunk的一些其他额外的子标签:retry-listeners、skippable-exception-classes、retryable-exception-classes、streams

示例:

 

<batch:job id="importProductsJob">
	(...)
	<batch:step id="readWrite">
		<batch:tasklet>
			<batch:chunk commit-interval="100"
				skip-limit="10">
				<skippable-exception-classes>
					<include class="org.springframework.batch.item.file.FlatFileParseException"/>
					<exclude class="java.io.FileNotFoundException"/>
				</skippable-exception-classes>
			</batch:chunk>
		</batch:tasklet>
	</batch:step>
</batch:job>

 

 

在chunk里面配置streams

示例:

 

<batch:job id="importProductsJob">
	(...)
	<batch:step id="readWrite">
			<batch:tasklet>
				<batch:chunk reader="productItemReader" writer="compositeWriter"/>
				<streams>
					<stream ref="productItemWriter1"/>
					<stream ref="productItemWriter2"/>
				</streams>
			</batch:tasklet>
	</batch:step>
</batch:job>

<bean id="compositeWriter"
	class="org.springframework.batch.item.support.CompositeItemWriter">
	<property name="delegates">
		<list>
			<ref bean="productItemWriter1"/>
			<ref bean="productItemWriter2"/>
		</list>
	</property>
</bean>

 

使用一个复合的stream形式,如上例的itemWriter,内部是writer1和writer2是stream的,需要在chunk里面的streams元素里面注册。

 

6、事务配置。

    事务是spring batch的一个重要主题。主要作用于batch处理程序的健壮性,chunk处理的合并来完成它的工作。因为事务调用的对象类型不同,你可以在不同的层次配置事务。

示例:

<batch:job id="importProductsJob">
	(...)
	<batch:step id="readWrite">
		<batch:tasklet transaction-manager="transactionManager" (...)>
			(...)
		</batch:tasklet>
	</batch:step>
</batch:job>

 事务,有几个定义的属性:行为,隔离 isolation,超时 timeout。

spring batch提供transaction-attributes标签来描述这些事务的属性。

 示例:

<batch:tasklet>
	<batch:chunk reader="productItemReader"
		writer="productItemReader"
		commit-interval="100"/>
		<batch:transaction-attributes isolation="DEFAULT"
		propagation="REQUIRED"
		timeout="30"/>
	</batch:chunk>
</batch:tasklet>

 isolation 定义数据库的隔离级别以及对外部事务的可见性。

spring的事务处理是基于java的异常体系的。java里面的异常分为两类:检查型异常(extends Exception)和非检查型异常(extends RuntimeException)

spring对检查型异常进行commit提交处理,对非检查型异常进行rollback回滚处理。

spring batch运行你对不需要触发rollback回滚动作的异常进行定义。你可以在tasklet的no-rollback-exception-class元素中进行指定。

 示例:

<batch:tasklet>
	(...)
	<batch:no-rollback-exception-classes>
		<batch:include
			class="org.springframework.batch.item.validator.ValidationException"/>
	</batch:no-rollback-exception-classes>
</batch:tasklet>

 对于JMS,spring batch提供chunk的reader-transactional-queue 属性,提供事务处理。

 

7、配置job仓库

job仓库(job repository)是spring batch底层基础的一个关键特征,它为spring batch的运行提供信息。

job仓库必须实现JobRepository接口,spring batch只提供了一个具体的实现类:SimpleJobRepository。

spring batch为DAO提供了两种实现:

    a、内存无持久化

    b、jdbc元数据的持久化

第一种“内存无持久化”的方式可以用来spring batch的测试和开发,但是不能应用于生产环境。因为内存中的东西会丢失。

设定job仓库的参数

    (1)、配置内存模式的job仓库:Configuring an in-memory job repository

示例:

<bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
	<property name="transactionManager-ref" ref="transactionManager"/>
</bean>

<bean id="transactionManager"	class="org.springframework.batch.support.transaction.ResourcelessTransactionManager"/>

<batch:job id="importInvoicesJob"	job-repository="jobRepository">
	(...)
</batch:job>

 由于job仓库是内存模式的,所以事务管理器采用ResourcelessTransactionManager。 这个类是NOOP (NO OPeration)无操作的PlatformTransactionManager接口实现。

     (2)、持久化的job仓库:Configuring a persistent job repository

关系型数据库的job仓库的属性如下:data-source,transaction-manager,isolation-level-for-create,max-varchar-length,table-prefix,lob-handler

示例:

<bean id="dataSource"
	class="org.apache.commons.dbcp.BasicDataSource"
	destroy-method="close">
	<property name="driverClassName" value="${batch.jdbc.driver}" />
	<property name="url" value="${batch.jdbc.url}" />
	<property name="username" value="${batch.jdbc.user}" />
	<property name="password" value="${batch.jdbc.password}" />
</bean>

<bean id="transactionManager" lazy-init="true"
	class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
	<property name="dataSource" ref="dataSource" />
</bean>

<batch:job-repository id="jobRepository"
	data-source="dataSource"
	transaction-manager="transactionManager"
	isolation-level-for-create="SERIALIZABLE"
	table-prefix="BATCH_"
/>

<batch:job id="importInvoicesJob" job-repository="jobRepository">
	(...)
</batch:job>

 [问题:如果在不同的物理节点上面运行同样的job会发生什么呢?]

前提:使用同一份spring batch的元数据,即采用同一份数据库上面的表结构。

当创建job实例和执行信息的元数据的时候,job仓库扮演了一个集中维护的工作库的角色。当job并发运行的时候,spring batch 的job仓库,可以防止创建相同的工作情况。这个功能依赖于底层数据库的事务能力,完成job的同步任务。我们设置job仓库的属性isolation-level-for-create="SERIALIZABLE",来避免发生job的并发问题。由于有这种防护措施,你可以把你的spring batch的job分布到各个不同的物理节点,保证你的job实例不会被创建两次。

     job仓库是spring batch底层结构中的重要部分,它记录了batch处理的信息,并且跟踪job运行的成功与失败。

 

 8、spring batch配置的高级主题。

(1)、使用step作用域。当使用SpEL的时候,step作用域很有用,来实现属性的晚绑定。

(2)、Spring表达式语言:Spring Expression Language (SpEL) 

Spring3.× 开始提供。

 step的作用的实例范围包括:jobParameters、jobExecutionContext、stepExecutionContext

 示例:

 

<bean id="decompressTasklet"
			class="com.manning.sbia.ch01.batch.DecompressTasklet"
			scope="step">
	<property name="inputResource"
	value="#{jobParameters['inputResource']}" />
	<property name="targetDirectory"
	value="#{jobParameters['targetDirectory']}" />
	<property name="targetFile"
	value="#{jobParameters['targetFile']}" />
</bean>

 SpEL由 #{ 和 } 组成

 

 在jobExecutionContext 和 stepExecutionContext也可以应用SpEL表达式。

 (3)、使用Linteners提供的更多的处理。

Spring batch在job和step级别上面提供listener。

Spring batch提供的listener的类型:

a、Job listener:在job级别监听处理过程

b、Step listeners:在step级别监听处理过程

c、Item listeners:监听item的retry重试和repeat重做动作

一、Job listener 可以监听job的运行,在job运行前和后添加动作。可以利用 listener标签,在job标签下面作为子元素进行添加。

示例1:

 

<batch:job id="importProductsJob">
	<batch:listeners>
	<batch:listener ref="importProductsJobListener"/>
	</batch:listeners>
</batch:job>

 importProductsJobListener不管job运行成功还是失败,它都会在job运行的开始和结束的时候,接收job的通知,进行监听。

 

 

还可以通过普通的POJO的java对象来做监听器,只需要进行简单的配置即可。

示例2:

 

<batch:listeners>
    <batch:listener ref="importProductsJobListener" after-job-method="afterJob" before-job-method="beforeJob"/>
</batch:listeners>

 可以在listener里面的ref指定引用的POJO的bean,通过after-job-method="afterJob" before-job-method="beforeJob" 来指定job之前和之后的执行方法。不过,被指定的这两个方法的参数都需要是:JobExecution jobExecution。 这个2个方法的返回值都是void

 

还有一种方法是利用“注释”来配置listener,spring batch会自己发现并运行该类。

 

二、Step listener

    Step有一系列的listener来跟踪step的处理过程。这里所有的listener接口都继承了StepListener接口。

Spring batch提供的Step的listener有:

a、ChunkListener:在chunk执行的之前和之后调用。

b、ItemProcessListener:在 ItemProcessor得到一个item之前和之后调用,在ItemProcessor抛出一个异常的时候调用。

c、ItemReadListener:在读取item之前和读取item之后调用,或者在读取item的过程中触发异常的时候调用。

d、ItemWriteListener:在一个item输出之前和之后调用,或者在item输出的过程中调用。

e、SkipListener:当读取、处理和输出的过程中产生了skip跳跃一个item的时候调用。

f、StepExecutionListener:在step运行之前和之后调用。

 接口代码:

public interface StepExecutionListener extends StepListener {
	void beforeStep(StepExecution stepExecution);
	ExitStatus afterStep(StepExecution stepExecution);
}

public interface ChunkListener extends StepListener {
	void beforeChunk();
	void afterChunk();
}


public interface ItemProcessListener<T, S> extends StepListener {
	void beforeProcess(T item);
	void afterProcess(T item, S result);
	void onProcessError(T item, Exception e);
}

public interface ItemReadListener<T> extends StepListener {
	void beforeRead();
	void afterRead(T item);
	void onReadError(Exception ex);
}

public interface ItemWriteListener<S> extends StepListener {
	void beforeWrite(List<? extends S> items);
	void afterWrite(List<? extends S> items);
	void onWriteError(Exception exception, List<? extends S> items);
}

public interface SkipListener<T,S> extends StepListener {
	void onSkipInRead(Throwable t);
	void onSkipInProcess(T item, Throwable t);
	void onSkipInWrite(S item, Throwable t);
}

 Step listener 作为tasklet标签的一个子标签进行配置。

 上面这些所有的Step listener都可以作为tasklet标签的子标签以相同的方式和等级进行配置。

示例:

	<bean id="importProductsJobListener"
		class="test.case01.java.batch.listener.job.ImportProductsJobListener" />
		
	<bean id="productStepExecutionListener"
		class="test.case01.java.batch.listener.step.ProductStepExecutionListener" />

	<bean id="productChunkListener"
		class="test.case01.java.batch.listener.step.chunk.ProductChunkListener" />
		
	<bean id="productItemProcessListener"
		class="test.case01.java.batch.listener.step.chunk.item.ProductItemProcessListener" />
		
	<batch:job id="importProducts" restartable="false">
		<batch:step id="readWriteProducts">
			<batch:tasklet>
				<batch:chunk reader="reader" writer="writer" processor="processor"
					commit-interval="100" skip-limit="5">
					<batch:skippable-exception-classes>
						<batch:include
							class="org.springframework.batch.item.file.FlatFileParseException" />
					</batch:skippable-exception-classes>
				</batch:chunk>
				<batch:listeners>
					<!-- here configure three kinds listeners for StepExecutionListener ,  ChunkListener , ItemProcessListener  for example.-->
					<batch:listener ref="productStepExecutionListener" />
					<batch:listener ref="productChunkListener" />
					<batch:listener ref="productItemProcessListener" />
				</batch:listeners>
			</batch:tasklet>
		</batch:step>
		<batch:validator ref="parameterValidator" />
		<batch:listeners>
			<batch:listener ref="importProductsJobListener" />
		</batch:listeners>
	</batch:job>

 

 三、retry重试和repeat重做的listener

repeat listener拥有的方法名称:before、after、close(在一个item最后一次repeat重做之后调用,不管repeat成功与否)、onError、open

retry listener拥有的方法名称:close(在一个item上面最后一次尝试retry之后调用,不管retry成功与否)、onError、open

接口代码:

public interface RepeatListener {
	void before(RepeatContext context);
	void after(RepeatContext context, RepeatStatus result);
	void open(RepeatContext context);
	void onError(RepeatContext context, Throwable e);
	void close(RepeatContext context);
}

public interface RetryListener {
	<T> void open(RetryContext context, RetryCallback<T> callback);
	<T> void onError(RetryContext context,
	RetryCallback<T> callback, Throwable e);
	<T> void close(RetryContext context,
	RetryCallback<T> callback, Throwable e);
}

 这2个接口,和上面的step listener的配置位置和方式一致,都是tasklet标签的子标签位置

 

四、配置继承关系

spring的XML提供配置的继承。使用abstract和parent两个属性。从一个parent的bean继承,表示该bean可以利用parent bean中的所有的属性,并且可以覆盖这些属性。

示例:

<bean id="parentBean" abstract="true">
	<property name="propertyOne" value="(...)"/>
</bean>

<bean id="childBean" parent="parentBean">
	<property name="propertyOne" value="(...)"/>
	<property name="propertyTwo" value="(...)"/>
</bean>

 这种继承关系是由spring提供的,在spring batch里面也可以使用。

listeners标签,提供merge属性,可以用来合并parent和自身的listener

 示例:

<job id="parentJob" abstract="true">
	<listeners>
		<listener ref="globalListener"/>
	<listeners>
</job>

<job id="importProductsJob" parent="parentJob">
	(...)
	<listeners merge="true">
		<listener ref="specificListener"/>
	<listeners>
</job>

 spring XML的这种继承关系,使得spring batch的XML配置更简单。

 

 

分享到:
评论
2 楼 hvang1988 2016-04-11  
理解错了,原来还是计数,不是时间
1 楼 hvang1988 2016-04-11  
TimeoutTerminationPolicy 是毫秒 默认30000L 就是30秒

相关推荐

    bank-spring-batch:具有多处理器的Spring Batch项目

    bank-spring-batch:具有多处理器的Spring Batch项目

    SpringBatch+Spring+Mybatis+MySql (spring batch 使用jar)

    Spring Batch是一个轻量级的,完全面向Spring的批处理框架,可以应用于企业级大量的数据处理系统。Spring Batch以POJO和大家熟知的Spring框架为基础,使开发者更容易的访问和利用企业级服务。Spring Batch可以提供...

    Spring.Batch批处理框架

    Spring Batch 是一个轻量级的、完善的批处理框架,旨在帮助企业建立健壮、高效的批处理应用。Spring Batch是Spring的一个子项目,使用Java语言并基于Spring框架为基础开发,使得已经使用 Spring 框架的开发者或者企业更...

    spring-batch包

    spring-batch4.0.0 batch spring-batch集成 spring-batch.jar

    Spring Boot整合Spring Batch,实现批处理

    Spring Boot整合Spring Batch的一个小例子,在网上发现这方面的资源比较少,特此将其上传供大家学习。

    Spring Batch API(Spring Batch 开发文档).CHM

    Spring Batch API(Spring Batch 开发文档).CHM。 官网 Spring Batch API,Spring Batch 开发文档

    SpringBoot+Batch实现

    spring batch3.x中文文档:http://www.kailing.pub/SpringBatchReference spring batch官方入门实例:https://projects.spring.io/spring-batch/ 简单的任务操作实现,当前项目只是一个demo,我也是初学,目前启动...

    springbatch 详解PDF附加 全书源码 压缩包

    spring batch批处理框架和对应的源码资源 rar 可以直接运行的

    SpringBatch-DataMigration SpringBatch数据迁移项目

    mybatis、springBatch、mysql、quartz、spring、springMVC 部署说明: 本项目为两个数据库,由一个数据库的表向另外一个数据库的表做数据迁移,其中数据库脚本在:/src/main/resources/sql/下面(其中data_rep中的表...

    The Definitive Guide to Spring Batch, 2nd Edition.epub

    Work with all aspects of batch processing in a modern Java environment using a selection of Spring frameworks. This book provides up-to-date examples using the latest configuration techniques based on...

    SpringBatch批处理 刘相编

    基本篇重点讲述了数据批处理的核心概念、典型的作业配置、作业步配置,以及Spring Batch框架中经典的三步走策略:数据读、数据处理和数据写,详尽地介绍了如何对CVS格式文件、JSON格式文件、XML文件、数据库和JMS...

    基于Spring Batch的大数据量并行处理

    基于Spring Batch的大数据量并行处理 基于Spring Batch的大数据量并行处理

    spring batch in action

    Spring Batch in Action is a comprehensive, in-depth guide to writing batch applications using Spring Batch. Written for developers who have basic knowledge of Java and the Spring lightweight ...

    springBoot+springBatch批量处理数据demo

    最近在研究springBoot+springbatch ,按照官网的实例做了一个实例。 最近在研究springBoot+springbatch ,按照官网的实例做了一个实例。

    SpringBatch批处理框架

    资源名称:Spring Batch 批处理框架内容简介:《Spring Batch 批处理框架》全面、系统地介绍了批处理框架Spring Batch,通过详尽的实战示例向读者展示了Spring Batch框架对大数据批处理的基本开发能力,并对框架的...

    spring batch批处理 教程

    二,Spring Batch结构 4 1,Spring Batch体系结构 4 2,Spring Batch主要对象 5 三,Spring Batch流程介绍 5 四,Spring Batch之Step执行过程介绍 6 五,Spring Batch应用 7 1,简单应用 7  构建应用 7  对象...

    SpringBatch数据库建表语句.txt

    SpringBatch数据库建表语句,存储springBatch批处理过程中需要保存的数据和步骤信息

    Spring Batch批处理框架

    轻松引领进入数据批处理世界:基本特性...深度探索Spring Batch 批处理框架的核心概念:作业配置、作业步配置,以及Spring Batch 框架中经典的三步走策略。 快速提升数据批处理的能力:高性能、高可靠性、并行处理。

    详细spring batch资料

    难得的详细spring batch资料 难得的详细spring batch资料

    Spring Batch读取txt文件并写入数据库的方法教程

    主要给大家介绍了Spring Batch读取txt文件并写入数据库的方法,SpringBatch 是一个轻量级、全面的批处理框架。这里我们用它来实现文件的读取并将读取的结果作处理,处理之后再写入数据库中的功能。需要的朋友可以...

Global site tag (gtag.js) - Google Analytics