Search This Blog

Sunday, 1 March 2015

Hello Spring Batch

Batch processing is something that we keep hearing in the enterprise industry. Most software projects will at some point require that certain tasks happen in the background on certain scheduled times. Such offline tasks constitute batch jobs. Send reminder emails to clients, sending job alerts to prospective candidates, fetching transactions from financial systems and loading them into analytical systems. Batch always creeps into the picture.
 Two main parts of the Batch system is the scheduler and the batching mechanism. The Scheduler is the code responsible for triggering jobs at appropriate times. The batch mechanism is our code which executes once triggered by the scheduler.
Just like Spring has creeped in and standardized database operations, messaging, localization it has also made its mark in the  world of Batch. While I do not have enough experience or background to explain the history/ geography of the batching world, I do know it won't be long before I encounter a batch system that needs to be implemented. So I thought why not give Spring Batch a dry run.
(BTW for a good story about batch history check this one)
I decided to do a simple batch job that picks up a file from the system and loads it into memory. Not much of a job - but who cares so long as I understand Spring Batch.
The Spring batch world has something called a Job Repository - This is nothing but a collection of all jobs available for running in the System.
<beans:bean id="transactionManager"
 class="org.springframework.batch.support.transaction.ResourcelessTransactionManager">
</beans:bean>

<beans:bean id="jobRepository"
 class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
 <beans:property name="transactionManager" ref="transactionManager" />
</beans:bean>
I have used an instance of SimpleJobRepository here. The thing about Spring batch is that it uses database tables to store information about the batch - BATCH_JOB_EXECUTION_SEQ, BATCH_JOB_SEQ, BATCH_STEP_EXECUTION_SEQ, BATCH_JOB_EXECUTION_CONTEXT just to name a few. These tables are used by SpringBatch to ensure correct execution and for providing a host of features which we shall not concern ourselves with now.
I didn't want any database so I decided to use the MapJobRepositoryFactoryBean which is a JobRepository implementation that holds data in memory - using maps to be precise.
A note from Spring docs : "Note that the in-memory repository is volatile and so does not allow restart between JVM instances. It also cannot guarantee that two job instances with the same parameters are launched simultaneously, and is not suitable for use in a multi-threaded Job, or a locally partitioned Step. So use the database version of the repository wherever you need those features."
To ensure transaction-like behavior they class ResourcelessTransactionManager is used which uses maps to track which resource is bound to which process. The database version of the repository is JobRepositoryFactoryBean. Once we create the repository we need to add jobs to the repository. For this we define the job as below:
<job id="reportJob" job-repository="jobRepository">
 <step id="step1">
  <tasklet>
   <chunk reader="itemReader" writer="itemWriter" processor="personProcessor"
    commit-interval="2" />
  </tasklet>
 </step>
</job>
The job at its very basic is composed of a series of steps - here we have only one. Our only step is to read the csv file, get the record, process it and then write it to an ArrayList. The input file is as below:
Robin,Varghese
Rohan,Naidu
Roman,Barlan
Spring is made up of beans, beans and then more beans - The data here would be best represented within the Spring world as a Bean. Accordingly I have defined a simple model class:
public class Person {
 private String lastName;
 private String firstName;
 // setter getters

}
Each row in the file can be represented by a Person instance. Now what we need is a bean that will read the csv file and generate the Person instance for us:
<beans:bean id="fieldSetMapper"
 class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
 <beans:property name="targetType" value="org.robin.learn.sb.data.Person" />
</beans:bean>

<beans:bean id="lineTokenizer"
 class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
 <beans:property name="names" value="firstName,lastName" />
</beans:bean>

<beans:bean id="lineMapper"
 class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
 <beans:property name="lineTokenizer" ref="lineTokenizer" />
 <beans:property name="fieldSetMapper" ref="fieldSetMapper" />
</beans:bean>

<beans:bean id="itemReader"
 class="org.springframework.batch.item.file.FlatFileItemReader">
 <beans:property name="resource" value="classpath:sample-data.csv" />
 <beans:property name="lineMapper" ref="lineMapper" />
</beans:bean>
Here we have implemented the ItemReader class From the Spring site:
"Strategy interface for providing the data. Implementations are expected to be 
stateful and will be called multiple times for each batch, with each call 
to read() returning a different value and finally returning null when all 
input data is exhausted. Implementations need not be thread-safe and clients 
of a ItemReader need to be aware that this is the case. "
The ItemReader is composed of three main parts:
  • a resource which is the input data to process 
  • a lineTokenizer which will split the input line into separate tokens and associate them with keys - thus creating a set of key value pairs. 
  • a FieldSetMapper which will instantiate the target bean (Person) and set the properties from the key value pairs generated in the previous step. 
Once the data has been read, we need to process it - i.e. deal with the Person instances created by our FlatFileItemReader.
public class PersonItemProcessor implements ItemProcessor<Person, Person> {

 // returning null indicates that the item should not be continued to be
 // processed.
 public Person process(final Person person) throws Exception {
  final String firstName = person.getFirstName().toUpperCase();
  final String lastName = person.getLastName().toUpperCase();
  final Person transformedPerson = new Person(firstName, lastName);
  System.out.println("Converting (" + person + ") into ("
    + transformedPerson + ")");
  return transformedPerson;
 }

}
This is the transformation class. It receives each person object, transforms it - in this case setting the values to upper case and returns the result. The last step is the saving of this data. I didn't really need a database for this example.An in memory solution was good enough. So:
<beans:bean id="itemWriter"
 class="org.springframework.batch.item.support.ListItemWriter" />
This completes the code for my step1 of Job1 .
public static void main(String[] args) {

      ApplicationContext context = new ClassPathXmlApplicationContext("spring-config.xml");
      JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");
      Job job = (Job) context.getBean("reportJob");

      try {
         JobExecution execution = jobLauncher.run(job, new JobParameters());
         System.out.println("Exit Status : " + execution.getStatus());
      } catch (JobExecutionAlreadyRunningException | JobRestartException
            | JobInstanceAlreadyCompleteException | JobParametersInvalidException e) {
         e.printStackTrace();
      }

      List<? extends Person> results = ((ListItemWriter<Person>) context.getBean("itemWriter"))
            .getWrittenItems();
      for (Person person : results) {
         System.out.println("Found <" + person + "> in the result.");
      }
      ((ConfigurableApplicationContext) context).close();
      System.out.println("Done");
   }
As can be seen here, to run the Job we needed an instance of JobLauncher class. To summarize
  • A JobLauncher class executes a Job retrieved from a JobRepository passing to it a JobParameters object. 
  • For the executed job, the JobLauncher returns us a JobExecution object which is like a report. 
  • Every job is composed of one or more steps each of which could be composed of read process and write operations. 
The xml configuration is as below:
<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/batch"
   xmlns:beans="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="
           http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans.xsd
           http://www.springframework.org/schema/batch
           http://www.springframework.org/schema/batch/spring-batch-2.2.xsd">

   <beans:bean id="fieldSetMapper"
      class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
      <beans:property name="targetType"
         value="org.robin.learn.spring.batch.data.Person" />
   </beans:bean>

   <beans:bean id="lineTokenizer"
      class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
      <beans:property name="names" value="firstName,lastName" />
   </beans:bean>

   <beans:bean id="lineMapper"
      class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
      <beans:property name="lineTokenizer" ref="lineTokenizer" />
      <beans:property name="fieldSetMapper" ref="fieldSetMapper" />
   </beans:bean>

   <beans:bean id="itemReader"
      class="org.springframework.batch.item.file.FlatFileItemReader">
      <beans:property name="resource" value="classpath:sample-data.csv" />
      <beans:property name="lineMapper" ref="lineMapper" />
   </beans:bean>


   <beans:bean id="itemWriter"
      class="org.springframework.batch.item.support.ListItemWriter" />

   <beans:bean id="personProcessor"
      class="org.robin.learn.spring.batch.PersonItemProcessor" />


   <job id="reportJob" job-repository="jobRepository">
      <step id="step1">
         <tasklet>
            <chunk reader="itemReader" writer="itemWriter"
               processor="personProcessor" commit-interval="2" />
         </tasklet>
      </step>
   </job>

   <beans:bean id="transactionManager"
      class="org.springframework.batch.support.transaction.ResourcelessTransactionManager">
   </beans:bean>

   <beans:bean id="jobRepository"
      class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
      <beans:property name="transactionManager" ref="transactionManager" />
   </beans:bean>

   <beans:bean id="jobLauncher"
      class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
      <beans:property name="jobRepository" ref="jobRepository" />
   </beans:bean>

</beans:beans>
The console output:
13:58:13.678 [main] INFO  o.s.b.f.xml.XmlBeanDefinitionReader - Loading XML bean definitions 
from class path resource [spring-config.xml]
...
13:58:14.258 [main] DEBUG o.s.t.i.NameMatchTransactionAttributeSource - Adding transactional method 
[*] with attribute [PROPAGATION_REQUIRED,ISOLATION_DEFAULT]
13:58:14.258 [main] DEBUG o.s.t.i.NameMatchTransactionAttributeSource - Adding transactional method 
[create*] with attribute [PROPAGATION_REQUIRES_NEW,ISOLATION_SERIALIZABLE]
13:58:14.258 [main] DEBUG o.s.t.i.NameMatchTransactionAttributeSource - Adding transactional method 
[getLastJobExecution*] with attribute [PROPAGATION_REQUIRES_NEW,ISOLATION_SERIALIZABLE]
...
13:58:14.278 [main] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Finished creating instance of bean 'jobRepository'
...
13:58:14.368 [main] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Creating instance of bean 'reportJob'
...
13:58:14.378 [main] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Creating instance of bean 'jobLauncher'
13:58:14.398 [main] DEBUG o.s.b.s.t.ResourcelessTransactionManager - Creating new transaction with 
name [org.springframework.batch.core.repository.support.SimpleJobRepository.getLastJobExecution]:
PROPAGATION_REQUIRES_NEW,ISOLATION_SERIALIZABLE
13:58:14.448 [main] DEBUG o.s.b.s.t.ResourcelessTransactionManager - Initiating transaction commit
13:58:14.448 [main] DEBUG o.s.b.s.t.ResourcelessTransactionManager - Committing resourceless transaction on
[org.springframework.batch.support.transaction.ResourcelessTransactionManager$ResourcelessTransaction@6b7fd49c]
13:58:14.508 [main] INFO  o.s.b.c.l.support.SimpleJobLauncher - Job: [FlowJob: [name=reportJob]] 
launched with the following parameters: [{}]
13:58:14.508 [main] DEBUG o.s.batch.core.job.AbstractJob - Job execution starting: JobExecution: id=0, 
version=0, startTime=null, endTime=null,
lastUpdated=Thu Feb 19 13:58:14 CST 2015, status=STARTING, exitStatus=exitCode=UNKNOWN;
exitDescription=, job=[JobInstance: id=0, version=0, Job=[reportJob]], jobParameters=[{}]
13:58:14.518 [main] DEBUG o.s.b.s.t.ResourcelessTransactionManager - Creating new transaction with name
[org.springframework.batch.core.repository.support.SimpleJobRepository.update]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
...
13:58:14.538 [main] INFO  o.s.batch.core.job.SimpleStepHandler - Executing step: [step1]
13:58:14.538 [main] DEBUG o.s.batch.core.step.AbstractStep - Executing: id=1
Converting (firstName: Robin, lastName: Varghese) into (firstName: ROBIN, lastName: VARGHESE)
Converting (firstName: Rohan, lastName: Naidu) into (firstName: ROHAN, lastName: NAIDU)
13:58:14.578 [main] DEBUG o.s.b.c.s.item.ChunkOrientedTasklet - Inputs not busy, ended: false
13:58:14.578 [main] DEBUG o.s.b.core.step.tasklet.TaskletStep - Applying contribution: [
StepContribution: read=2, written=2, filtered=0, readSkips=0, writeSkips=0, processSkips=0, exitStatus=EXECUTING]
...
Converting (firstName: Roman, lastName: Barlan) into (firstName: ROMAN, lastName: BARLAN)
13:58:14.588 [main] DEBUG o.s.batch.core.step.AbstractStep - Step execution success: id=1
...
13:58:14.608 [main] INFO  o.s.b.c.l.support.SimpleJobLauncher - Job: [FlowJob: [name=reportJob]] 
completed with the following parameters: [{}] and the following status: [COMPLETED]
Exit Status : COMPLETED
Found <firstName: ROBIN, lastName: VARGHESE> in the result.
Found <firstName: ROHAN, lastName: NAIDU> in the result.
Found <firstName: ROMAN, lastName: BARLAN> in the result.
Done

3 comments:

  1. This comment has been removed by the author.

    ReplyDelete
  2. Hi Robin
    Nice and simple explanation.
    Can you please explain me use of below tag?


    Here you have mentioned value="org.robin.learn.spring.batch.data.Person". What are the possible values we can use instead of "org.robin.learn.spring.batch.data.Person"?

    ReplyDelete
    Replies
    1. beans:property name="targetType"
      value="org.robin.learn.spring.batch.data.Person"

      Delete