Apache Camel – JDBC with Spring Transaction support
Continuing my journey with Fuse ESB (ServiceMix) and Apache Camel I will present a small example on how to route your messages in a transacted way using Spring Transaction Manager.
For the purpose of this tutorial we are going to define a datasource to in memory Hsqldb instance and configure it with ‘org.springframework.jdbc.datasource.DataSourceTransactionManager’. Our camel example will be based on the tutorial/document from the FuseSource website itself which could be found at : http://fusesource.com/docs/router/2.5/transactions/index.html.
Our application will do the following:
- Create database table in the in-memory hsqldb database and populate it with same data – we are going to use JdbcTemplate for this purpose
- The route itself will pick up a messages from the specified directory, one by one, one of the messages will contain data causing an exception and rolling back our transaction and database commit.
- After each message is processed sucessfully the dump of our database will be printed to the console
- If the exception occurs and the transaction will be rolled back, the appropriate log would be displayed stating the current database content
The full source code for this example, packaged as a maven based project can be downloaded from the Github website.
1. We are going to use 2 spring beans for our example, one with injected datasource in a constructor which will populate our database data:
import javax.sql.DataSource;
import org.apache.log4j.Logger;
import org.springframework.jdbc.core.JdbcTemplate;
public class CreateTable {
private static Logger log = Logger.getLogger(CreateTable.class);
protected DataSource dataSource;
protected JdbcTemplate jdbc;
public DataSource getDataSource() {
return dataSource;
}
public void setDataSource(DataSource dataSource) {
this.dataSource = dataSource;
}
public CreateTable(DataSource ds) {
log.info("CreateTable constructor called");
setDataSource(ds);
setUpTable();
}
public void setUpTable() {
log.info("About to set up table...");
jdbc = new JdbcTemplate(dataSource);
jdbc.execute("create table accounts (name varchar(50), amount int)");
jdbc.update("insert into accounts (name,amount) values (?,?)",
new Object[] {"Major Clanger", 2000}
);
jdbc.update("insert into accounts (name,amount) values (?,?)",
new Object[] {"Tiny Clanger", 100}
);
log.info("Table created");
}
}
another bean will be responsible for our route logic, performing database operations. Datasource for this bean is injected using a property (see the camel-context.xml file below for more details):
import java.util.List;
import javax.sql.DataSource;
import org.apache.camel.Exchange;
import org.apache.camel.language.XPath;
import org.apache.log4j.Logger;
import org.springframework.jdbc.core.JdbcTemplate;
public class AccountService {
private static Logger log = Logger.getLogger(AccountService.class);
private JdbcTemplate jdbc;
public AccountService() {
}
public void setDataSource(DataSource ds) {
jdbc = new JdbcTemplate(ds);
}
/**
* Adds a specific amount of money to a named account
*
* @param name - account name
* @param amount - amount to add
*/
public void credit(@XPath("/transaction/transfer/receiver/text()") String name,
@XPath("/transaction/transfer/amount/text()") String amount) {
log.info("credit() called with args name = " + name + " and amount = " + amount);
int origAmount = jdbc.queryForInt("select amount from accounts where name = ?", new Object[] { name });
int newAmount = origAmount + Integer.parseInt(amount);
jdbc.update("update accounts set amount = ? where name = ?", new Object[] { newAmount, name });
}
/**
* Subtract a specific amount of money from a named account.
*
* @param name - account name
* @param amount - amount to add
*/
public void debit(@XPath("/transaction/transfer/sender/text()") String name, @XPath("/transaction/transfer/amount/text()") String amount) {
log.info("debit() called with args name = " + name + " and amount = " + amount);
int iamount = Integer.parseInt(amount);
if (iamount > 100) {
throw new IllegalArgumentException("Debit limit is 100");
}
int origAmount = jdbc.queryForInt("select amount from accounts where name = ?", new Object[] { name });
int newAmount = origAmount - Integer.parseInt(amount);
if (newAmount < 0) {
throw new IllegalArgumentException("Not enough in account");
}
jdbc.update("update accounts set amount = ? where name = ?", new Object[] { newAmount, name });
}
public void dumpTable(Exchange ex) {
log.info("dump() called");
List<?> dump = jdbc.queryForList("select * from accounts");
ex.getIn().setBody(dump.toString());
}
}
Our datasource, transaction manager and injection points for our beans are defined in camel-context.xml file located in ‘resources/META-INF/spring’ directory:
<!-- Configures the Camel Context-->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<camelContext xmlns="http://camel.apache.org/schema/spring">
<package>tutorial</package>
</camelContext>
<!-- spring transaction manager -->
<bean id="txManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource"/>
</bean>
<!-- datasource to the database -->
<bean id="dataSource" class="org.springframework.jdbc.datasource.SimpleDriverDataSource">
<property name="driverClass" value="org.hsqldb.jdbcDriver"/>
<property name="url" value="jdbc:hsqldb:mem:camel"/>
<property name="username" value="sa"/>
<property name="password" value=""/>
</bean>
<!-- Bean to initialize table in the DB -->
<bean id="createTable" class="tutorial.jdbc.CreateTable">
<constructor-arg ref="dataSource" />
</bean>
<!-- Bean for account service -->
<bean id="accountService" class="tutorial.jdbc.AccountService">
<property name="dataSource" ref="dataSource"/>
</bean>
</beans>
2 and 3. When our configuration and the spring beans are ready we can start coding our route:
//noop option - if true, the file is not moved or deleted in any way
from("file:src/data?noop=true")
//mark the route as transacted
.transacted()
//execute spring bean methods
.beanRef("accountService","credit")
.beanRef("accountService","debit")
.beanRef("accountService","dumpTable")
//log the result
.to("log:ExampleRouter");
4. For simple error handling mechanism I have decided to use ‘onException’ which will be executed every time an exception will be thrown in our route and transaction will be rolled back
//handle exceptions and log them
onException(IllegalArgumentException.class).
maximumRedeliveries(0)
.handled(true)
.beanRef("accountService", "dumpTable")
.to("file:target/messages?fileName=deadLetters.xml&fileExist=Append")
.markRollbackOnly();
Remember to use SpringRouteBuilder for our transacted route instead of simple RouteBuilder class, below is the full source of our route configuration using Java DSL:
import org.apache.camel.spring.SpringRouteBuilder;
/**
* A Camel Router
*
* @version $
*/
public class MyRouteBuilder extends SpringRouteBuilder {
public void configure() {
//handle exceptions and log them
onException(IllegalArgumentException.class).
maximumRedeliveries(0)
.handled(true)
.beanRef("accountService", "dumpTable")
.to("file:target/messages?fileName=deadLetters.xml&fileExist=Append")
.markRollbackOnly();
//our route definition
//noop option - if true, the file is not moved or deleted in any way
from("file:src/data?noop=true")
//mark the route as transacted
.transacted()
//execute spring bean methods
.beanRef("accountService","credit")
.beanRef("accountService","debit")
.beanRef("accountService","dumpTable")
//log the result
.to("log:ExampleRouter");
}
}
To run an example, navigate into the project folder and execute the following command: