Camel as ActiveMQ client (jms,activemq components)

In this tutorial, we will learn how to send and receive messages to and from ActiveMQ using Camel.

Example Scenario:

Case 1 : Camel as basic ActiveMQ client.

In this scenario, we have to read messages from a queue “firstQueue” and send it to a queue named “secondQueue”.

Case 2 : Receiving messages from ActiveMQ using connection pooling in Camel.

In this scenario, we have to receive messages from a topic “firstTopic” as 3 concurrent Camel consumers using connection pooling and print the message to console.

Case 3 : Using Message Listener to receive messages from ActiveMQ from Camel.

This is also a basic message consumer, but it uses Message Listener as consumer in Camel.

Project Artifacts:

1. Technologies used –

a. Eclipse IDE for Java EE developer 4.5.0.20150621-1200 (Mars)
b. Preinstalled Maven with Eclipse Mars
c. Camel 2.16.0
d. Java 1.8
e. ActiveMQ 5.12.0

2. Eclipse project folder structure:

CamelActiveMQClientProjStruc

3. Jars Used

If you are not using Maven, here is the list of jars you would need for this project:

camel-core-2.16.0.jar
slf4j-api-1.6.6.jar
jaxb-core-2.2.11.jar
jaxb-impl-2.2.11.jar
activemq-camel-5.12.0.jar
camel-jms-2.15.2.jar
spring-jms-4.1.6.RELEASE.jar
spring-messaging-4.1.6.RELEASE.jar
spring-beans-4.1.6.RELEASE.jar
activemq-spring-5.12.0.jar
xbean-spring-3.18.jar
commons-logging-1.0.3.jar
activemq-broker-5.12.0.jar
activemq-openwire-legacy-5.12.0.jar
geronimo-jta_1.0.1B_spec-1.0.1.jar
commons-pool2-2.3.jar
activemq-pool-5.12.0.jar
activemq-jms-pool-5.12.0.jar
geronimo-jms_1.1_spec-1.1.1.jar
activemq-client-5.12.0.jar
hawtbuf-1.11.jar
geronimo-j2ee-management_1.1_spec-1.0.1.jar
camel-spring-2.16.0.jar
spring-context-4.1.6.RELEASE.jar
spring-core-4.1.6.RELEASE.jar
spring-expression-4.1.6.RELEASE.jar
spring-aop-4.1.6.RELEASE.jar
aopalliance-1.0.jar
spring-tx-4.1.6.RELEASE.jar
camel-stream-2.16.0.jar

4. Maven Dependencies

file:pom.xml . Paste the below content to pom.xml. It has camel and ActiveMQ dependencies. It can pass two arguments – “java” for running camel in Java DSL route and “xml” for running as Spring routes.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.javanbeyond</groupId>
	<artifactId>CamelWorkingWithMessages</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<dependencies>
		<dependency>
			<groupId>org.apache.camel</groupId>
			<artifactId>camel-core</artifactId>
			<version>2.16.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-camel</artifactId>
			<version>5.12.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.camel</groupId>
			<artifactId>camel-spring</artifactId>
			<version>2.16.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.camel</groupId>
			<artifactId>camel-stream</artifactId>
			<version>2.16.0</version>
		</dependency>
	</dependencies>
	<build>
		<plugins>
			<plugin>
				<groupId>org.codehaus.mojo</groupId>
				<artifactId>exec-maven-plugin</artifactId>
				<version>1.1.1</version>
				<executions>
					<execution>
						<phase>test</phase>
						<goals>
							<goal>java</goal>
						</goals>
						<configuration>
							<mainClass>com.javanbeyond.MainClass</mainClass>
							<arguments>
								<argument>java</argument>
								<!--<argument>xml</argument> -->
							</arguments>
						</configuration>
					</execution>
				</executions>
			</plugin>
		</plugins>
	</build>
</project>

5. Main Class

file: MainClass.java . Main class is our starting point and is called by maven. It can run Java DSL or Spring DSL based on argument passed. Also, we have to comment the case we don’t want to run.

package com.javanbeyond;

import org.apache.camel.CamelContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class MainClass {

  public static void main(String[] argsthrows Exception {
    if (args.length < 1) {
      System.err.println("Please pass java or xml as argument");
      System.exit(1);
    }
    if (args[0].equalsIgnoreCase("java")) {
      // executeJavaDSL("getSendMessageQueue");
      // executeJavaDSL("usePoolingInActiveMQ");
      executeJavaDSL("usingMessageListenerWithActiveMQ");
    else if (args[0].equalsIgnoreCase("xml")) {
      // executeSpringDSL("getSendMessageQueue");
      executeSpringDSL("usePoolingInActiveMQ");
      // executeSpringDSL("usingMessageListenerWithActiveMQ");
    else {
      System.out.println("Argument not supported.");
      System.exit(1);
    }
    while (true) {

    }
  }

  @SuppressWarnings("resource")
  public static void executeSpringDSL(String routeIdthrows Exception {
    // Starting specific route of xml
    new ClassPathXmlApplicationContext("camel-context.xml")
        .getBean("camel", CamelContext.class).startRoute(routeId);
  }

  public static void executeJavaDSL(String routeIdthrows Exception {
    new CamelJavaDSL().getCamelContext().startRoute(routeId);
  }

}

6a. Java DSL class

file: CamelJavaDSL.java . This class contains three routes for three different cases of listening to ActiveMQ.

Case 1: “getSendMessageQueue

In this case, the broker url  is passed to ActiveMQ component and then, it is added to Camel Context. This route gets all the messages from “firstQueue” and send it to “secondQueue”.

Case 2 : “usePoolingInActiveMQ”

In this example, initially three concurrent consumers are created whose number can go up to 10. Connections are being pooled. It will be helpful to clients that has to deal with huge volume of messages.

Case 3 : “usingMessageListenerWithActiveMQ”

This example uses Message Listener to get messages from queue.

package com.javanbeyond;

import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.camel.component.ActiveMQComponent;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jms.JmsComponent;
import org.apache.camel.component.jms.JmsConfiguration;
import org.apache.camel.impl.DefaultCamelContext;

public class CamelJavaDSL extends RouteBuilder {
  private static CamelContext context;

  CamelJavaDSL() throws Exception {
    context = new DefaultCamelContext();
    // This component is for simply reading and writing to Queues
    context.addComponent("activemqSimple", getActiveMQSimpleComponent());
    // It used connection pooling to listen to Topic
    context.addComponent("activemqPool", getActiveMQPoolComponent());
    context.addComponent("jms", getJmsComponent());
    context.addRoutes(this);
    context.start();
  }

  @Override
  public void configure() throws Exception {
    // Take messages from firstQueue and send it to secondQueue
    // For topic we need to use the word "topic" after ActiveMQ
    // scheme
    from("activemqSimple:queue:firstQueue").autoStartup(false).routeId("getSendMessageQueue")
     .to("activemqSimple:queue:secondQueue");
    // Read messages from topic with 3 consumers.
    // concurrentConsumers attribute can also be used to define
    // number of consumers

    from("activemqPool:topic:firstTopic?concurrentConsumers=3").autoStartup(false).routeId("usePoolingInActiveMQ")
     .to("stream:out");

    // if you don't mention queue or topic after ActiveMQ scheme,
    // the default value taken is queue.
    // Setting the listener class
    from("jms:firstQueue").autoStartup(false).routeId("usingMessageListenerWithActiveMQ")
     .bean(new ActiveMQMessageListener());
  }

  public ActiveMQConnectionFactory getConnectionFactory() {
    return new ActiveMQConnectionFactory("tcp://localhost:61616");
  }

  public CamelContext getCamelContext() {
    return context;
  }

  public JmsComponent getJmsComponent() {
    // Create JmsComponent with connectionFactory
    return JmsComponent.jmsComponentAutoAcknowledge(getConnectionFactory());
  }

  // This component will be used for simple activemq connection
  public ActiveMQComponent getActiveMQSimpleComponent() {
    return ActiveMQComponent.activeMQComponent("tcp://localhost:61616");
  }

  // This component will be used for connection pooling
  public ActiveMQComponent getActiveMQPoolComponent() {
    ActiveMQComponent activeMQComponent = new ActiveMQComponent();
    activeMQComponent.setConfiguration(getJmsConfiguration());
    return activeMQComponent;
  }

  public JmsConfiguration getJmsConfiguration() {
    JmsConfiguration jmsConfiguration = new JmsConfiguration();
    // Once all the messages are sent or received, the client send
    // acknowledgement to ActiveMQ
    jmsConfiguration.setAcknowledgementMode(Session.AUTO_ACKNOWLEDGE);
    jmsConfiguration.setTransacted(false);
    // It will start at 3 parallel consumers
    jmsConfiguration.setConcurrentConsumers(3);
    jmsConfiguration.setConnectionFactory(getPooledConnectionFactory());
    return jmsConfiguration;
  }

  public PooledConnectionFactory getPooledConnectionFactory() {
    PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
    // A maximum of 10 connections can be opened on high volume of messages
    pooledConnectionFactory.setMaxConnections(10);
    pooledConnectionFactory.setConnectionFactory(getConnectionFactory());
    return pooledConnectionFactory;
  }

}

6b. Camel Spring XML DSL

file:camel-context.xml. Spring XML equivalent to Java DSL .

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="
         http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
         http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
	<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
	<!--  Disable automatic startup of routes -->
		<route id="getSendMessageQueue" autoStartup="false">
		<!--  Take messages from firstQueue and send it to secondQueue.
				 For topic we need to use the word "topic" after activemq
				 scheme -->
			<from uri="activemqSimple:queue:firstQueue" />
			<to uri="activemqSimple:queue:secondQueue" />
		</route>
		<route id="usePoolingInActiveMQ" autoStartup="false">
		<!--  concurrentConsumers attribute can also be used to define
				 number of consumers.
				 Read messages from topic with 3 consumers. -->
			<from uri="activemqPool:topic:firstTopic?concurrentConsumers=3" />
			<to uri="stream:out" />
		</route>
		<route id="usingMessageListenerWithActiveMQ" autoStartup="false">
			<from uri="jms:firstQueue" />
			<bean ref="msgList" />
		</route>
	</camelContext>
	<!-- 		 Create connection factory to connect to ActiveMQ broker
	 -->
	<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
		<property name="brokerURL" value="tcp://localhost:61616" />		
	</bean>
	<!-- Pooled Connection Factory -->
	<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
	<!--  A maximum of 10 connections can be opened on high volume -->
		<property name="maxConnections" value="10" />
		<property name="connectionFactory" ref="connectionFactory" />
	</bean>
	<bean id="jmsConfiguration" class="org.apache.camel.component.jms.JmsConfiguration">
		<property name="connectionFactory" ref="pooledConnectionFactory" />
		<property name="transacted" value="false" />
		<!--  It will start at 3 parallel consumers -->
		<property name="concurrentConsumers" value="3" />
		<!--  Once all the messages are sent or received, the client send
		 acknowledgement to activemq -->
		<property name="acknowledgementModeName" value="AUTO_ACKNOWLEDGE" />
	</bean>
	<bean id="activemqSimple" class="org.apache.activemq.camel.component.ActiveMQComponent">
	</bean>
	<bean id="activemqPool" class="org.apache.activemq.camel.component.ActiveMQComponent">
		<property name="configuration" ref="jmsConfiguration" />
	</bean>
	
	<!-- For route 3 usingMessageListenerWithActiveMQ-->
	<bean id="jms" class="org.apache.camel.component.jms.JmsComponent">
		<property name="connectionFactory" ref="connectionFactory" />
	</bean>
	<bean id="msgList" class="com.javanbeyond.ActiveMQMessageListener" />
</beans>

Example Execution:

Case 1:

  1. Make changes to Main class by calling either executeJavaDSL or executeSpringDSL with “getSendMessageQueue” as argument and commenting other methods.
  2. Start ActiveMQ and click on Queues tab.

CamelActiveMQClientQueues

3. Run the example by calling maven clean and install and passing either “xml” or “java” to the Main Class.

4. On ActiveMQ queues tab, click on “Send To” in Operations column across “firstQueue”. Type any thing in the body of the message.

CamelActiveMQClientSendMessages

3. Click on Send.

4. Click on “secondQueue”. You should see the same message in this queue now.

CamelActiveMQClientsecondQueueMsg

Case 2:

  1. Make changes to Main class by calling either executeJavaDSL or executeSpringDSL with “usePoolingInActiveMQ” as argument and commenting other methods.
  2. Start ActiveMQ and click on Topics tab. Click on “firstTopic”.
  3. Run the example by calling maven clean and install and passing either “xml” or “java” to the Main Class.
  4. Enter any text of your choice in the message body.

CamelActiveMQClientConnectionPooling

4. Click on Send. You should see the same text on eclipse console three times as the number of consumers we started with is three.

CamelActiveMQClientEclipseConsole

Case 3:

  1. Make changes to Main class by calling either executeJavaDSL or executeSpringDSL with “usingMessageListenerWithActiveMQ” as argument and commenting other methods.
  2. Run the example by calling maven clean and install and passing either “xml” or “java” to the Main Class.
  3. On ActiveMQ queues tab, click on “Send To” in Operations column across “firstQueue”. Type any thing in the body of the message.

MessageListenerCamelActiveMQClient

3. You should see the same message in your eclipse console.

CamelActiveMQClientListenerConsole

 

 

Leave a Reply

Back to Top
%d bloggers like this: