How to read Oracle AQ using Camel Route and Save it to a Data Source ?
How do we start when there is a need to handle a legacy system with a newer technology ?
This high level code example includes how to connect to Oracle Advanced Queue from a target Database, read the message from the AQ and then using camel routes perform the business logic on method level.
Note : Make sure to add correct class and dependency via mvn repository or your defined repo. Some example of imports below:
import org.apache.camel.component.jms.JmsComponent; import org.apache.camel.component.jms.JmsConfiguration; import oracle.jms.AQjmsFactory; import oracle.jdbc.pool.OracleDataSource; import org.apache.camel.LoggingLevel; import org.apache.camel.TypeConversionException; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.bean.validator.BeanValidationException;
First step where we will setup the connection factory and JmsConfiguration:
@Configuration
@EnableJms
@EnableConfigurationProperties(AqOracleDataSourceProperties.class)
@Profile("dev")
public class OracleAdvancedQueueConfig {
private static final String JMS_CLIENT_ACKNOWLEDGEMENT_MODE = "CLIENT_ACKNOWLEDGE";
//This should be the connection properties of the database that you are
reading from.
@Autowired
private AqOracleDataSourceProperties aqOracleDataSourceProperties;
//Using AQjmsFactory for Oracle Advanced Query aka AQ to create a
connection factory
@Bean
public QueueConnectionFactory connectionFactoryOracleAQQueue() throws JMSException, SQLException {
return AQjmsFactory.getQueueConnectionFactory(oracleAqDataSource());
}
//Now creating Jms Configuration
@Bean
public JmsConfiguration oracleAqJmsConfiguration() throws Exception {
JmsConfiguration jmsConfiguration = new JmsConfiguration();
jmsConfiguration.setConnectionFactory(connectionFactoryOracleAQQueue());
jmsConfiguration.setAcceptMessagesWhileStopping(true);
jmsConfiguration.setFormatDateHeadersToIso8601(true);
return jmsConfiguration;
}
//Now creating the Jms Component specific to camel Routes. Make sure the
method name matches with the one you have in the properties,yaml etc.
@Bean
public JmsComponent oracleJmsAq() throws Exception {
JmsComponent jmsComponent = new JmsComponent();
jmsComponent.setConfiguration(oracleAqJmsConfiguration());
return jmsComponent;
}
private DataSource oracleAqDataSource() throws SQLException {
OracleDataSource oracleDataSource = new OracleDataSource();
oracleDataSource.setUser(aqOracleDataSourceProperties.getUsername());
oracleDataSource.setPassword(aqOracleDataSourceProperties.getPassword());
oracleDataSource.setURL(aqOracleDataSourceProperties.getUrl());
oracleDataSource.setImplicitCachingEnabled(true);
oracleDataSource.setFastConnectionFailoverEnabled(true);
return oracleDataSource;
}
// Second step should be Camel Routes setup
**Make sure to get the dependency first**
**Make sure the method name at JmsComponent matches with the route jms component**
@Component
@Profile("dev")
public class YourRoute extends RouteBuilder {
@SuppressWarnings("unchecked")
@Override
public void configure() throws Exception {
//Use your logic here to catch any exception
onException(UncategorizedSQLException.class)
.maximumRedeliveries(0)
.handled(false)
.useOriginalMessage()
.log(LoggingLevel.WARN, logger, "Unable to save message: " +
BODY_AND_STACKTRACE);
onException(IOException.class, Exception.class)
.handled(false)
.useOriginalMessage()
.logHandled(true)
.getRedeliveryPolicy()
.logRetryStackTrace(true)
.retryAttemptedLogLevel(LoggingLevel.WARN) // generate email
.retriesExhaustedLogLevel(LoggingLevel.WARN) // generate email
.allowRedeliveryWhileStopping(false)
.maximumRedeliveries(180)
.redeliveryDelay(100)
.maximumRedeliveryDelay(10000)
.useExponentialBackOff();
//Make sure to have these properties in yaml or some other class based on your needs.
from("{{app.some-camel-component}}{{app.oracle-aq-endpoint.name}}")
.routeId(ROUTE_ID + "Dev")
.to(METHOD_WHERE_WE_WILL_SAVE_THE_MESSAGE);
from(METHOD_WHERE_WE_WILL_SAVE_THE_MESSAGE)
.routeId(ROUTE_ID)
.bean(someServiceClassNameWhereYourMethodLives, "saveToDatabaseMethod(${body}, ${header.JMSTimestamp})");
}
}
Let us know how did this work for you. Any questions, comment below or DM me via Twitter, we will be happy to pair program with you.