Camel as ActiveMQ client (jms,activemq components)
In this tutorial, we will learn how to send and receive messages to and from ActiveMQ using Camel.
Example Scenario:
Case 1 : Camel as basic ActiveMQ client.
In this scenario, we have to read messages from a queue “firstQueue” and send it to a queue named “secondQueue”.
Case 2 : Receiving messages from ActiveMQ using connection pooling in Camel.
In this scenario, we have to receive messages from a topic “firstTopic” as 3 concurrent Camel consumers using connection pooling and print the message to console.
Case 3 : Using Message Listener to receive messages from ActiveMQ from Camel.
This is also a basic message consumer, but it uses Message Listener as consumer in Camel.
Project Artifacts:
1. Technologies used –
a. Eclipse IDE for Java EE developer 4.5.0.20150621-1200 (Mars)
b. Preinstalled Maven with Eclipse Mars
c. Camel 2.16.0
d. Java 1.8
e. ActiveMQ 5.12.0
2. Eclipse project folder structure:
3. Jars Used
If you are not using Maven, here is the list of jars you would need for this project:
camel-core-2.16.0.jar
slf4j-api-1.6.6.jar
jaxb-core-2.2.11.jar
jaxb-impl-2.2.11.jar
activemq-camel-5.12.0.jar
camel-jms-2.15.2.jar
spring-jms-4.1.6.RELEASE.jar
spring-messaging-4.1.6.RELEASE.jar
spring-beans-4.1.6.RELEASE.jar
activemq-spring-5.12.0.jar
xbean-spring-3.18.jar
commons-logging-1.0.3.jar
activemq-broker-5.12.0.jar
activemq-openwire-legacy-5.12.0.jar
geronimo-jta_1.0.1B_spec-1.0.1.jar
commons-pool2-2.3.jar
activemq-pool-5.12.0.jar
activemq-jms-pool-5.12.0.jar
geronimo-jms_1.1_spec-1.1.1.jar
activemq-client-5.12.0.jar
hawtbuf-1.11.jar
geronimo-j2ee-management_1.1_spec-1.0.1.jar
camel-spring-2.16.0.jar
spring-context-4.1.6.RELEASE.jar
spring-core-4.1.6.RELEASE.jar
spring-expression-4.1.6.RELEASE.jar
spring-aop-4.1.6.RELEASE.jar
aopalliance-1.0.jar
spring-tx-4.1.6.RELEASE.jar
camel-stream-2.16.0.jar
4. Maven Dependencies
file:pom.xml . Paste the below content to pom.xml. It has camel and ActiveMQ dependencies. It can pass two arguments – “java” for running camel in Java DSL route and “xml” for running as Spring routes.
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.javanbeyond</groupId> <artifactId>CamelWorkingWithMessages</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-core</artifactId> <version>2.16.0</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-camel</artifactId> <version>5.12.0</version> </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-spring</artifactId> <version>2.16.0</version> </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-stream</artifactId> <version>2.16.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>exec-maven-plugin</artifactId> <version>1.1.1</version> <executions> <execution> <phase>test</phase> <goals> <goal>java</goal> </goals> <configuration> <mainClass>com.javanbeyond.MainClass</mainClass> <arguments> <argument>java</argument> <!--<argument>xml</argument> --> </arguments> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
5. Main Class
file: MainClass.java . Main class is our starting point and is called by maven. It can run Java DSL or Spring DSL based on argument passed. Also, we have to comment the case we don’t want to run.
package com.javanbeyond; import org.apache.camel.CamelContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class MainClass { public static void main(String[] args) throws Exception { if (args.length < 1) { System.err.println("Please pass java or xml as argument"); System.exit(1); } if (args[0].equalsIgnoreCase("java")) { // executeJavaDSL("getSendMessageQueue"); // executeJavaDSL("usePoolingInActiveMQ"); executeJavaDSL("usingMessageListenerWithActiveMQ"); } else if (args[0].equalsIgnoreCase("xml")) { // executeSpringDSL("getSendMessageQueue"); executeSpringDSL("usePoolingInActiveMQ"); // executeSpringDSL("usingMessageListenerWithActiveMQ"); } else { System.out.println("Argument not supported."); System.exit(1); } while (true) { } } @SuppressWarnings("resource") public static void executeSpringDSL(String routeId) throws Exception { // Starting specific route of xml new ClassPathXmlApplicationContext("camel-context.xml") .getBean("camel", CamelContext.class).startRoute(routeId); } public static void executeJavaDSL(String routeId) throws Exception { new CamelJavaDSL().getCamelContext().startRoute(routeId); } }
6a. Java DSL class
file: CamelJavaDSL.java . This class contains three routes for three different cases of listening to ActiveMQ.
Case 1: “getSendMessageQueue“
In this case, the broker url is passed to ActiveMQ component and then, it is added to Camel Context. This route gets all the messages from “firstQueue” and send it to “secondQueue”.
Case 2 : “usePoolingInActiveMQ”
In this example, initially three concurrent consumers are created whose number can go up to 10. Connections are being pooled. It will be helpful to clients that has to deal with huge volume of messages.
Case 3 : “usingMessageListenerWithActiveMQ”
This example uses Message Listener to get messages from queue.
package com.javanbeyond; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.camel.component.ActiveMQComponent; import org.apache.activemq.pool.PooledConnectionFactory; import org.apache.camel.CamelContext; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.jms.JmsComponent; import org.apache.camel.component.jms.JmsConfiguration; import org.apache.camel.impl.DefaultCamelContext; public class CamelJavaDSL extends RouteBuilder { private static CamelContext context; CamelJavaDSL() throws Exception { context = new DefaultCamelContext(); // This component is for simply reading and writing to Queues context.addComponent("activemqSimple", getActiveMQSimpleComponent()); // It used connection pooling to listen to Topic context.addComponent("activemqPool", getActiveMQPoolComponent()); context.addComponent("jms", getJmsComponent()); context.addRoutes(this); context.start(); } @Override public void configure() throws Exception { // Take messages from firstQueue and send it to secondQueue // For topic we need to use the word "topic" after ActiveMQ // scheme from("activemqSimple:queue:firstQueue").autoStartup(false).routeId("getSendMessageQueue") .to("activemqSimple:queue:secondQueue"); // Read messages from topic with 3 consumers. // concurrentConsumers attribute can also be used to define // number of consumers from("activemqPool:topic:firstTopic?concurrentConsumers=3").autoStartup(false).routeId("usePoolingInActiveMQ") .to("stream:out"); // if you don't mention queue or topic after ActiveMQ scheme, // the default value taken is queue. // Setting the listener class from("jms:firstQueue").autoStartup(false).routeId("usingMessageListenerWithActiveMQ") .bean(new ActiveMQMessageListener()); } public ActiveMQConnectionFactory getConnectionFactory() { return new ActiveMQConnectionFactory("tcp://localhost:61616"); } public CamelContext getCamelContext() { return context; } public JmsComponent getJmsComponent() { // Create JmsComponent with connectionFactory return JmsComponent.jmsComponentAutoAcknowledge(getConnectionFactory()); } // This component will be used for simple activemq connection public ActiveMQComponent getActiveMQSimpleComponent() { return ActiveMQComponent.activeMQComponent("tcp://localhost:61616"); } // This component will be used for connection pooling public ActiveMQComponent getActiveMQPoolComponent() { ActiveMQComponent activeMQComponent = new ActiveMQComponent(); activeMQComponent.setConfiguration(getJmsConfiguration()); return activeMQComponent; } public JmsConfiguration getJmsConfiguration() { JmsConfiguration jmsConfiguration = new JmsConfiguration(); // Once all the messages are sent or received, the client send // acknowledgement to ActiveMQ jmsConfiguration.setAcknowledgementMode(Session.AUTO_ACKNOWLEDGE); jmsConfiguration.setTransacted(false); // It will start at 3 parallel consumers jmsConfiguration.setConcurrentConsumers(3); jmsConfiguration.setConnectionFactory(getPooledConnectionFactory()); return jmsConfiguration; } public PooledConnectionFactory getPooledConnectionFactory() { PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(); // A maximum of 10 connections can be opened on high volume of messages pooledConnectionFactory.setMaxConnections(10); pooledConnectionFactory.setConnectionFactory(getConnectionFactory()); return pooledConnectionFactory; } }
6b. Camel Spring XML DSL
file:camel-context.xml. Spring XML equivalent to Java DSL .
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <!-- Disable automatic startup of routes --> <route id="getSendMessageQueue" autoStartup="false"> <!-- Take messages from firstQueue and send it to secondQueue. For topic we need to use the word "topic" after activemq scheme --> <from uri="activemqSimple:queue:firstQueue" /> <to uri="activemqSimple:queue:secondQueue" /> </route> <route id="usePoolingInActiveMQ" autoStartup="false"> <!-- concurrentConsumers attribute can also be used to define number of consumers. Read messages from topic with 3 consumers. --> <from uri="activemqPool:topic:firstTopic?concurrentConsumers=3" /> <to uri="stream:out" /> </route> <route id="usingMessageListenerWithActiveMQ" autoStartup="false"> <from uri="jms:firstQueue" /> <bean ref="msgList" /> </route> </camelContext> <!-- Create connection factory to connect to ActiveMQ broker --> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616" /> </bean> <!-- Pooled Connection Factory --> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> <!-- A maximum of 10 connections can be opened on high volume --> <property name="maxConnections" value="10" /> <property name="connectionFactory" ref="connectionFactory" /> </bean> <bean id="jmsConfiguration" class="org.apache.camel.component.jms.JmsConfiguration"> <property name="connectionFactory" ref="pooledConnectionFactory" /> <property name="transacted" value="false" /> <!-- It will start at 3 parallel consumers --> <property name="concurrentConsumers" value="3" /> <!-- Once all the messages are sent or received, the client send acknowledgement to activemq --> <property name="acknowledgementModeName" value="AUTO_ACKNOWLEDGE" /> </bean> <bean id="activemqSimple" class="org.apache.activemq.camel.component.ActiveMQComponent"> </bean> <bean id="activemqPool" class="org.apache.activemq.camel.component.ActiveMQComponent"> <property name="configuration" ref="jmsConfiguration" /> </bean> <!-- For route 3 usingMessageListenerWithActiveMQ--> <bean id="jms" class="org.apache.camel.component.jms.JmsComponent"> <property name="connectionFactory" ref="connectionFactory" /> </bean> <bean id="msgList" class="com.javanbeyond.ActiveMQMessageListener" /> </beans>
Example Execution:
Case 1:
- Make changes to Main class by calling either executeJavaDSL or executeSpringDSL with “getSendMessageQueue” as argument and commenting other methods.
- Start ActiveMQ and click on Queues tab.
3. Run the example by calling maven clean and install and passing either “xml” or “java” to the Main Class.
4. On ActiveMQ queues tab, click on “Send To” in Operations column across “firstQueue”. Type any thing in the body of the message.
3. Click on Send.
4. Click on “secondQueue”. You should see the same message in this queue now.
Case 2:
- Make changes to Main class by calling either executeJavaDSL or executeSpringDSL with “usePoolingInActiveMQ” as argument and commenting other methods.
- Start ActiveMQ and click on Topics tab. Click on “firstTopic”.
- Run the example by calling maven clean and install and passing either “xml” or “java” to the Main Class.
- Enter any text of your choice in the message body.
4. Click on Send. You should see the same text on eclipse console three times as the number of consumers we started with is three.
Case 3:
- Make changes to Main class by calling either executeJavaDSL or executeSpringDSL with “usingMessageListenerWithActiveMQ” as argument and commenting other methods.
- Run the example by calling maven clean and install and passing either “xml” or “java” to the Main Class.
- On ActiveMQ queues tab, click on “Send To” in Operations column across “firstQueue”. Type any thing in the body of the message.
3. You should see the same message in your eclipse console.