Saturday, January 31, 2015

Spring Integration - Bulk processing Example

In this post, I'm going to share my experience in using Spring Integration for a bulk processing task. This was the first time I was using Spring Integration and it did not disappoint me. It was pretty robust in terms of error handling and definitely scalable.

Disclaimer:

The actual task being performed has been modified into a fictitious e – commerce domain task, since I cannot reveal the original task which is customer specific and confidential. However, the nature of the task in terms of processing of clob and xml remain the same and the complexity of the original task has been retained.

Also, the views posted on this post are strictly personal.


Objective:

The objective is to work upon customer information from an ecommerce application and process it and save it into database as clob data.

The task is to perform data processing by reading xml from a file in the file system and then store it as clob into a table.

Following is the high level of tasks required to be performed:
  1. There will be one xml file for each customer and the file would be prefixed with the corresponding customer Id.
  2. The xml has customer’s personal information. The file would be named as customerID_personal.xml. For eg,  1000_personal.xml
  3. Read personal info file (say with name 1000_ personal.xml) from file system.
  4. Perform some transformation on the xml.
  5. Perform some more processing on the resulting xml and then save the xml into a database table as a clob data against the customerID.

Constraints:

  1. The volume of files is high running close to more than half a million.
  2. Need track of which file process succeeded and which one errored out, with reliable/robust error handling.
  3. This is not a real time processing application. i.e, files are dumped into the file system by a separate system and the processing can happen offline.

Design Approach:

  1. The customerIDs are initally saved into a table (say ALL_CUSTOMERS) with a status column (STATUS_ID), with values enumerated for NOT_STARTED, IN PROGRESS, ERROR, COMPLETE.
  2. Instead of polling the files in the file system, we will poll the ALL_CUSTOMERS table to get the records which are in NOT_STARTED status and process them.
  3. When we start processing each record, we will update it to IN PROGRESS.
  4. If there is any error during processing, we will update it to ERROR.
  5. If there is no error during processing, we will update it to COMPLETE.

Use of Spring Integration to solve this problem:

Following diagram shows the process flow. 


Step 1 - Polling records from database:

Goal is to poll records which are in NOT_STARTED status and place them on a channel
  1. Create an inbound channel adapter which polls database at regular intervals. The inbound adapter could be a jdbc adapter. Also, we can associate a poller to the adapter to poll the database at regular intervals.
  2. The JDBC adapter:
    • has as a query attribute, where we can specify the query to be run when the poller wakes up.
    • Has a rowmapper attribute, where we can specify a POJO, which represents each record retrieved from the database.
    • Has a “update” attribute, where we can specify a query which will be run after polling each record. We will use this to update the status of the polled record to “IN PROGRESS”
  3. The inbound channel adapter connects to a channel, “dbInputChannel” where it places the data retrieved from database.

Step 2 – Arrange for parallel processing of each of these records

Connect the dbInputChannel to a record splitter, which will split each of the records and put it in another channel “storageDataFileReadChannel”. So, here we are creating one thread per record read from the database and then these records are processed in parallel. 


Step 3 – Use of Headers

Place each of the record read as a header data (which is like session context), so that the CustomerID is available for all channels in the workflow till the end.


Step 4 – Read Personal Info file from file system.

Create a service activator (which is essentially a Java service class), which reads data from a channel, invokes the service method and then places data on a output channel.

  • Input channel : richStorageDataFileReadChannel
  • Service method: fileReadAdapter.readFile
  • Output channel: richStorageDataOutputChannel

Step 5 – Perform XSLT transfomation on the personal info data

Create a Transformer which will act on the personal info data, apply XSLT transformation and place the output on an output channel.
  • Input channel : transactionSourceChannel
  • XSLT :xsl/customTransformation.xsl
  • Output channel: transactionOutputChannel

Step 6 – Perform processing logic on the transformed data

Create a service activator (which is essentially a Java service class), which reads data from a channel, invokes the service method and then places data on an output channel. This service method will also save the data back into database and will also mark the status of the record in ALL_CUSOMERS table as COMPLETE.
  • Input channel : transactionOutputChannel
  • Service method: transactionService.processData
  • Output channel: processedTransactionChannel

Error handling:

A global error handler is used which will set the record status to “ERROR”. This error handler would be invoked if there is any error at any step in the entire workflow.

Results, Performance:

Overall, the workflow worked like a charm. There were some errors encountered occasionally, sometimes due to invalid xml in the file system mainly due to presence of special characters. 
However, the error handling mechanism took care of it, the record was marked as error and importantly, the process didn’t stop there. 


The speed was good too, with a processing rate of 120 records in 5 minutes (on a not so high end processor and the original task involved two different file reads per record). Note that I had set the timer to poll the database every 5 minutes. It could process 120 records in parallel, before it could start the next poll.

Conclusion:

I would recommend to use Spring Integration because of its ease of use and robustness. It is well documented too and is a good fit for bulk processing.

No comments:

Post a Comment