Skip to content

motivation

Mahmoud Ben Hassine edited this page Jul 14, 2020 · 2 revisions

This section is about the motivation behind creating Easy Batch and the goal is to show how it can tremendously simplify batch application development. The use case is a typical ETL application that loads data from a CSV file into a relational database table. Here is the input file containing some products data:

#id,name,description,price,published,lastUpdate
0001,product1,description1,2500,true,2014-01-01
000x,product2,description2,2400,true,2014-01-01
0003,,description3,2300,true,2014-01-01
0004,product4,description4,-2200,true,2014-01-01
0005,product5,description5,2100,true,2024-01-01
0006,product6,description6,2000,true,2014-01-01,Blah!

Let's assume we have a JPA EntityManager used to persist Product objects to the database:

import java.util.Date;

public class Product {
    private long id;
    private String name;
    private String description;
    private double price;
    private boolean published;
    private Date lastUpdate;
    // getters, setters and toString() omitted
}

Before persisting products to the database, data must be validated to ensure that:

  • product id and name are specified
  • product price is not negative
  • product last update date is in the past

Product names should be persisted in uppercase as well. Finally, records starting with # should be ignored, mainly the header record (and probably comments and trailer record). To keep the example simple, we will write products data to the standard output and not to a database. So let's get started!

The following listing is a possible solution that I (benas) have seen many times in production systems:

import java.io.File;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Scanner;

public class ProductJobWithoutEasyBatch {

    public static long nbFiltered = 0, nbIgnored = 0, nbRejected = 0, nbProcessed = 0;
    public static SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");

    public static void main(String[] args) throws Exception {
        long startTime = System.currentTimeMillis();
        Scanner scanner = new Scanner(new File(args[0]));
        long currentRecordNumber = 0;
        while (scanner.hasNextLine()) {
            currentRecordNumber++;
            String record = scanner.nextLine();
            if (record.startsWith("#")) {
                System.err.println("record N°" + currentRecordNumber + " [" + record + "] filtered");
                nbFiltered++;
                continue;
            }
            String[] tokens = record.split(",");
            if (tokens.length != 6) {
                System.err.println("record N°" + currentRecordNumber
                    + " [" + record + "] ignored : unexpected record size " + tokens.length);
                nbIgnored++;
                continue;
            }
            Product product = new Product();
            String token = tokens[0];//product id
            if (token.isEmpty()) {
                rejectRecord(currentRecordNumber, record, "product Id is mandatory but was not specified");
                continue;
            }
            try {
                product.setId(Long.parseLong(token));
            } catch (NumberFormatException e) {
                rejectRecord(currentRecordNumber, record, "Unable to convert " + token + " to type long for field id");
                continue;
            }
            token = tokens[1];//product name
            if (token.isEmpty()) {
                rejectRecord(currentRecordNumber, record, "product name is mandatory but was not specified");
                continue;
            }
            product.setName(token);
            product.setDescription(tokens[2]); //product description
            token = tokens[3];//product price
            if (token.isEmpty()) {
                rejectRecord(currentRecordNumber, record, "product price is mandatory but was not specified");
                continue;
            }
            try {
                double price = Double.parseDouble(token);
                if (price < 0) {
                    rejectRecord(currentRecordNumber, record, "Product price must not be negative");
                    continue;
                }
                product.setPrice(price);
            } catch (NumberFormatException e) {
                rejectRecord(currentRecordNumber, record, "Unable to convert " + token + " to type double for field price");
                continue;
            }
            product.setPublished(Boolean.parseBoolean(tokens[4]));
            token = tokens[5];//product last update date
            try {
                Date lastUpdate = dateFormat.parse(token);
                if (lastUpdate.after(new Date())) {
                    rejectRecord(currentRecordNumber, record, "Last update date must be in the past");
                    continue;
                }
                product.setLastUpdate(lastUpdate);
            } catch (ParseException e) {
                rejectRecord(currentRecordNumber, record, "Unable to convert " + token + " to a date for field lastUpdate");
                continue;
            }
            // transform product data
            Product transformedProduct = new Product();
            transformedProduct.setId(product.getId());
            transformedProduct.setName(product.getName().toUpperCase());
            transformedProduct.setDescription(product.getDescription());
            transformedProduct.setPrice(product.getPrice());
            transformedProduct.setPublished(product.isPublished());
            transformedProduct.setLastUpdate(product.getLastUpdate());

            // save product to database here, to keep it simple I just write it to the standard output
            System.out.println("product = " + transformedProduct);
            nbProcessed++;
        }
        System.out.println("Job Report:");
        System.out.println("Job duration: " + (System.currentTimeMillis() - startTime) + "ms");
        System.out.println("total records: " + currentRecordNumber);
        System.out.println("nbFiltered: " + nbFiltered);
        System.out.println("nbIgnored: " + nbIgnored);
        System.out.println("nbRejected: " + nbRejected);
        System.out.println("nbProcessed: " + nbProcessed);
    }

    public static void rejectRecord(long currentRecordNumber, String record, String cause) {
        System.err.println("record N°" + currentRecordNumber + " [" + record + "] rejected : " + cause);
        nbRejected++;
    }

}

This solution actually works and implements the requirements above. But it's obviously a maintenance nightmare! It could be worse if the Product POJO contained dozen of fields, which is often the case in production systems. What is even worse is that many people in the enterprise do not care about refactoring this kind of code into a generic solution (even in a couple of abstract classes to create a mini in-house framework), and you end up having dozen if not hundreds of copy-pasted batch jobs like this for other domain types..

Anyway, in the previous solution there are only a few lines which represent the batch job business logic. Do you see them? Here they are:

// transform product data
Product transformedProduct = new Product();
transformedProduct.setId(product.getId());
transformedProduct.setName(product.getName().toUpperCase());
transformedProduct.setDescription(product.getDescription());
transformedProduct.setPrice(product.getPrice());
transformedProduct.setPublished(product.isPublished());
transformedProduct.setLastUpdate(product.getLastUpdate());

// save product to database here, to keep it simple I just write it to the standard output
System.out.println("product = " + transformedProduct);

All the rest is boilerplate: handling IO, reading, filtering, parsing and validating data, type conversion, mapping records to Product instances, logging and reporting statistics at the end of execution. The idea behind Easy Batch is to handle all of this error prone boilerplate code for you so that you focus only on your batch business logic. So let's see how would be the solution with Easy Batch. First, let's create a RecordProcessor to implement the product transformation logic:

import org.jeasy.batch.core.processor.RecordProcessor;
import org.jeasy.batch.core.record.GenericRecord;
import org.jeasy.batch.core.record.Record;

public class ProductProcessor implements RecordProcessor<Record<Product>, Record<Product>> {

	@Override
	public Record<Product> processRecord(Record<Product> record) {
		Product product = record.getPayload();
		product.setName(product.getName().toUpperCase());
		return new GenericRecord<>(record.getHeader(), product);
	}
}

Then, we can declare (and not implement like in the above solution) data validation constraints on Product POJO with the elegant Bean Validation API as follows:

import org.hibernate.validator.constraints.NotEmpty;

import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Past;
import java.text.SimpleDateFormat;
import java.util.Date;

public class Product {

    @NotNull
    private long id;

    @NotEmpty
    private String name;

    private String description;

    @Min(0)
    private double price;

    private boolean published;

    @Past
    private Date lastUpdate;

    // getters, setters and toString() omitted

}

After that, we will use the StandardOutputRecordWriter to write the product to the standard output. To save the record to database in a real use case, we could use the JpaRecordWriter instead. Finally, we need to configure a job to:

  • Read data from the flat file products.csv
  • Filter records starting with #
  • Map each CSV record to an instance of the Product POJO
  • Validate product data
  • Process each record using the ProductProcessor implementation
  • Write records to the standard output with a StandardOutputRecordWriter

This can be done with the following snippet:

import java.nio.file.Paths;

import org.jeasy.batch.core.filter.StartsWithStringRecordFilter;
import org.jeasy.batch.core.job.Job;
import org.jeasy.batch.core.job.JobBuilder;
import org.jeasy.batch.core.job.JobExecutor;
import org.jeasy.batch.core.job.JobReport;
import org.jeasy.batch.flatfile.DelimitedRecordMapper;
import org.jeasy.batch.flatfile.FlatFileRecordReader;
import org.jeasy.batch.validation.BeanValidationRecordValidator;

public class ProductJobWithEasyBatch {

    public static void main(String[] args) {
        String[] fields = {"id", "name", "description", "price", "published", "lastUpdate"};
        Job job = new JobBuilder()
                .reader(new FlatFileRecordReader(Paths.get(args[0])))
                .filter(new StartsWithStringRecordFilter("#"))
                .mapper(new DelimitedRecordMapper<>(Product.class, fields))
                .validator(new BeanValidationRecordValidator())
                .processor(new ProductProcessor())
                .writer(new StandardOutputRecordWriter())
                .build();

        JobExecutor jobExecutor = new JobExecutor();
        JobReport report = jobExecutor.execute(job);
        jobExecutor.shutdown();

        System.out.println("job report = " + report);
    }

}

If yo do the math, it's about a 1/4 ratio (~25 vs ~100 LoC), which means we saved about 75% of code by using Easy Batch! And it is even more actually, because if we count JMX monitoring, transaction management and batching which we get for free as well, we could actually save 90% or even 95% of code.

That's all. As you can see, except from implementing the core business logic, all we have done is providing configuration metadata that Easy Batch cannot guess. The framework will take care of all the boilerplate code of reading, filtering, parsing, validating and mapping data to domain objects as well as reporting and logging. The code is more easier to read, understand, test, (re)use and maintain.

Clone this wiki locally