Camel, Java, Spring

How to read Oracle AQ using Camel Route and Save it to a Data Source ?

How do we start when there is a need to handle a legacy system with a newer technology ?

This high level code example includes how to connect to Oracle Advanced Queue from a target Database, read the message from the AQ and then using camel routes perform the business logic on method level.

Note : Make sure to add correct class and dependency via mvn repository or your defined repo. Some example of imports below:

import org.apache.camel.component.jms.JmsComponent;
import org.apache.camel.component.jms.JmsConfiguration;
import oracle.jms.AQjmsFactory;
import oracle.jdbc.pool.OracleDataSource;
import org.apache.camel.LoggingLevel;
import org.apache.camel.TypeConversionException;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.bean.validator.BeanValidationException;

First step where we will setup the connection factory and JmsConfiguration:

@Configuration
@EnableJms
@EnableConfigurationProperties(AqOracleDataSourceProperties.class)
@Profile("dev")
public class OracleAdvancedQueueConfig {

    private static final String JMS_CLIENT_ACKNOWLEDGEMENT_MODE = "CLIENT_ACKNOWLEDGE";

	//This should be the connection properties of the database that you are 
          reading from. 
    @Autowired
    private AqOracleDataSourceProperties aqOracleDataSourceProperties;


	//Using AQjmsFactory for Oracle Advanced Query aka AQ to create a 
          connection factory
    @Bean
    public QueueConnectionFactory connectionFactoryOracleAQQueue() throws JMSException, SQLException {

        return AQjmsFactory.getQueueConnectionFactory(oracleAqDataSource());
    }

	//Now creating Jms Configuration
    @Bean
    public JmsConfiguration oracleAqJmsConfiguration() throws Exception {

        JmsConfiguration jmsConfiguration = new JmsConfiguration();
        jmsConfiguration.setConnectionFactory(connectionFactoryOracleAQQueue());
        jmsConfiguration.setAcceptMessagesWhileStopping(true);
        jmsConfiguration.setFormatDateHeadersToIso8601(true);

        return jmsConfiguration;
    }

	//Now creating the Jms Component specific to camel Routes. Make sure the 
         method name matches with     the one you have in the properties,yaml etc.
    @Bean
    public JmsComponent oracleJmsAq() throws Exception {

        JmsComponent jmsComponent = new JmsComponent();
        jmsComponent.setConfiguration(oracleAqJmsConfiguration());

        return jmsComponent;
    }

    
    private DataSource oracleAqDataSource() throws SQLException {

        OracleDataSource oracleDataSource = new OracleDataSource();
        oracleDataSource.setUser(aqOracleDataSourceProperties.getUsername());
        oracleDataSource.setPassword(aqOracleDataSourceProperties.getPassword());
        oracleDataSource.setURL(aqOracleDataSourceProperties.getUrl());
        oracleDataSource.setImplicitCachingEnabled(true);
        oracleDataSource.setFastConnectionFailoverEnabled(true);

        return oracleDataSource;
    }

// Second step should be Camel Routes setup

**Make sure to get the dependency first**

**Make sure the method name at JmsComponent matches with the route jms component**

@Component
@Profile("dev")
public class YourRoute extends RouteBuilder {

    @SuppressWarnings("unchecked")
    @Override
    public void configure() throws Exception {

	//Use your logic here to catch any exception

        onException(UncategorizedSQLException.class)
                .maximumRedeliveries(0)
                .handled(false)
                .useOriginalMessage()
                .log(LoggingLevel.WARN, logger, "Unable to save message: " + 
                BODY_AND_STACKTRACE);

        onException(IOException.class, Exception.class)
                .handled(false)
                .useOriginalMessage()
                .logHandled(true)
                .getRedeliveryPolicy()
                .logRetryStackTrace(true)
                .retryAttemptedLogLevel(LoggingLevel.WARN) // generate email
                .retriesExhaustedLogLevel(LoggingLevel.WARN) // generate email
                .allowRedeliveryWhileStopping(false)
                .maximumRedeliveries(180)
                .redeliveryDelay(100)
                .maximumRedeliveryDelay(10000)
                .useExponentialBackOff();


	//Make sure to have these properties in yaml or some other class based on your needs.
        from("{{app.some-camel-component}}{{app.oracle-aq-endpoint.name}}")
                .routeId(ROUTE_ID + "Dev")
                .to(METHOD_WHERE_WE_WILL_SAVE_THE_MESSAGE);
        

        from(METHOD_WHERE_WE_WILL_SAVE_THE_MESSAGE)
                .routeId(ROUTE_ID)
                .bean(someServiceClassNameWhereYourMethodLives, "saveToDatabaseMethod(${body}, ${header.JMSTimestamp})");

    }
}

Let us know how did this work for you. Any questions, comment below or DM me via Twitter, we will be happy to pair program with you.

Leave a Reply

Your email address will not be published. Required fields are marked *