Category Archives : ACTIVEMQ

Tomcat + Spring + ActiveMQ

In this tutorial, we are going to learn how to use Queues and Topics as managed resources in Tomcat as client and use Spring to listen and send messages to/from ActiveMQ. We are going to create a war application for listening to queues in ActiveMQ.

Example Scenario:

We have two queues in ActiveMQ – firstQueue and secondQueue. Whenever we send a sentence as message to “firstQueue”, it should be consumed by our client and thereafter, the characters  and their number of occurrences in the sentence should reach “secondQueue” as a message.

Project Artifacts:

1. Technologies used –

a. Eclipse IDE for Java EE developer 4.5.0.20150621-1200 (Mars)
b. Preinstalled Maven with Eclipse Mars
c. Java 1.8
d. ActiveMQ 5.12.0
e. Spring 4.2.2.RELEASE

2. Eclipse project folder structure:

ActiveMQSpringJNDITomcatProjStruc

3. Jars Used

If you are not using Maven, here is the list of jars you would need for this project:

spring-jms-4.2.2.RELEASE.jar
spring-aop-4.2.2.RELEASE.jar
aopalliance-1.0.jar
spring-beans-4.2.2.RELEASE.jar
spring-context-4.2.2.RELEASE.jar
spring-expression-4.2.2.RELEASE.jar
spring-core-4.2.2.RELEASE.jar
commons-logging-1.2.jar
spring-messaging-4.2.2.RELEASE.jar
spring-tx-4.2.2.RELEASE.jar
spring-web-4.2.2.RELEASE.jar
activemq-pool-5.12.0.jar
slf4j-api-1.7.10.jar
activemq-jms-pool-5.12.0.jar
geronimo-jms_1.1_spec-1.1.1.jar
activemq-client-5.12.0.jar
hawtbuf-1.11.jar
geronimo-j2ee-management_1.1_spec-1.0.1.jar
geronimo-jta_1.0.1B_spec-1.0.1.jar
commons-pool2-2.3.jar

4. Maven Dependencies

file:pom.xml . Paste the below content to pom.xml. It has activemq and spring dependencies.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.javanbeyond</groupId>
    <artifactId>TomcatSpringActiveMQClient</artifactId>
    <packaging>war</packaging>
    <version>0.0.1-SNAPSHOT</version>
    <name>TomcatSpringActiveMQClient Maven Webapp</name>
    <url>http://maven.apache.org</url>
    <dependencies>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>4.2.2.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-web</artifactId>
            <version>4.2.2.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <version>5.12.0</version>
        </dependency>
    </dependencies>
    <build>
        <finalName>TomcatSpringActiveMQClient</finalName>
    </build>
</project>

5. Deployment Descriptor

file:web.xml.

<!DOCTYPE web-app PUBLIC "-//Sun Microsystems, Inc.//DTD Web Application 2.3//EN" "http://java.sun.com/dtd/web-app_2_3.dtd" >
<web-app>
    <display-name>Archetype Created Web Application</display-name>
    <context-param>
        <param-name>contextConfigLocation</param-name>
        <param-value>classpath:applicationContext.xml</param-value>
    </context-param>
    <listener>
        <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
    </listener>
</web-app>

6. Tomcat changes for JNDI

Add the following entry to Tomcat context.xml in <TOMCAT_HOME>/config folder as a child of Context root tag . This entry instructs Tomcat to enter these resources in JNDI which would be “looked up” by the application with the same JNDI name. Please note that we have used ActiveMQ pooling connection factory for better performance and scalability.

			<Resource name="jms/activemqConnFact" auth="Container" type="org.apache.activemq.pool.PooledConnectionFactory" description="JMS Connection Factory" factory="org.apache.activemq.jndi.JNDIReferenceFactory" brokerURL="tcp://localhost:61616" brokerName="localhost" maxConnections="10"></Resource>
<Resource name="jms/sendToQueue" auth="Container" type="org.apache.activemq.command.ActiveMQQueue" description="My Sending Queue" factory="org.apache.activemq.jndi.JNDIReferenceFactory" physicalName="secondQueue"></Resource>
<Resource name="jms/receiveFromQueue" auth="Container" type="org.apache.activemq.command.ActiveMQQueue" description="My Receiving Queue" factory="org.apache.activemq.jndi.JNDIReferenceFactory" physicalName="firstQueue"></Resource>

7. Spring Application Context.

file:applicationContext.xml. This is the main spring configuration file for our application to listen to ActiveMQ queues. If you are not using JNDI for some reason, please replace the JNDI lookup spring tags with bean declaration of connection factory and queues.

<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms" xmlns:jee="http://www.springframework.org/schema/jee" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd             http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd             http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.2.xsd             http://www.springframework.org/schema/context  http://www.springframework.org/schema/context/spring-context-3.0.xsd">
    <!-- To recognize Autowire annotation in listener class -->
    <context:annotation-config></context:annotation-config>
    <!-- Template class for sending messages. -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="activemqConnectionFactory"></constructor-arg>
        <property name="sessionTransacted" value="false"></property>
        <property name="defaultDestination" ref="jmsTemplateDest"></property>
        <property name="deliveryPersistent" value="false"></property>
    </bean>
    <bean id="messageListener" class="com.javanbeyond.MyMessageListenerSender"></bean>
    <!-- Listener class which gets any message that arrives at the queue "FirstQueue" -->
    <jms:listener-container container-type="default" connection-factory="activemqConnectionFactory" acknowledge="auto" destination-resolver="myDestinationResolver">
        <!-- The destination should match the id of JNDI lookup. The Destination Resolver decides how to resolve the destination. In this case, it would look for an id with the destination value in Spring application Context. If no destination resolver is mentioned, the name will be taken as Queue or Topic name of Message provider -->
        <jms:listener destination="jmsReceiveJndi" ref="messageListener" concurrency="3-5"></jms:listener>
    </jms:listener-container>
    <bean id="myDestinationResolver" class="org.springframework.jms.support.destination.BeanFactoryDestinationResolver"></bean>
    <!-- Getting connection factory and destination queues from JNDI.Topics can similarly be obtained, if needed -->
    <jee:jndi-lookup id="activemqConnectionFactory" jndi-name="jms/activemqConnFact" expected-type="javax.jms.ConnectionFactory"></jee:jndi-lookup>
    <jee:jndi-lookup id="jmsTemplateDest" jndi-name="jms/sendToQueue" expected-type="javax.jms.Queue"></jee:jndi-lookup>
    <jee:jndi-lookup id="jmsReceiveJndi" jndi-name="jms/receiveFromQueue" expected-type="javax.jms.Queue"></jee:jndi-lookup>
</beans>

8. Listener class which can send message as well.

file:MyMessageListenerSender.java. This class receives the message from ActiveMQ and send it to “secondQueue” with the help of JmsTemplate. Please note that we could have also used JmsTemplate to receive messages synchronously. But listener classes are better suited for getting messages from Message Providers.

 

package com.javanbeyond;

import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

// Listener class that gets message from one queue and send it to other queue
// Here Sending and Receiving of messages happening in the same class.
// In practical circumstances, these operations may be distributed in different classes.
public class MyMessageListenerSender implements MessageListener {

  @Autowired
  JmsTemplate jmsTemplate;

  public void onMessage(Message msg) {
    if (msg instanceof TextMessage) {
      final TextMessage text = (TextMessagemsg;
      try {
        System.out.println("Received Message : " + text.getText());
        System.out.println("Sending message to Other Queue");
        jmsTemplate.send(new MessageCreator() {
          public Message createMessage(Session session)
              throws JMSException {
            // Creating Map message. We can create
            // Bytes,Stream,Object,Text message as well
            MapMessage mapMsg = session.createMapMessage();
            Map<Character, Integer> characterMap = calculateLetterNumber(
                text.getText());
            for (Entry<Character, Integer> entry : characterMap
                .entrySet()) {
              mapMsg.setInt(entry.getKey().toString(),
                  entry.getValue());
            }
            return mapMsg;
          }
        });
      catch (JMSException e) {
        e.printStackTrace();
      }
    }
  }

  private Map<Character, Integer> calculateLetterNumber(String sentence) {
    Map<Character, Integer> characterMap = new HashMap<Character, Integer>();
    for (Character c : sentence.toCharArray()) {
      if (characterMap.containsKey(c)) {
        characterMap.put(c, characterMap.get(c1);
      else {
        characterMap.put(c, 1);
      }
    }
    return characterMap;

  }
}

Example Execution:

  1. Start ActiveMQ and login as admin/admin and click on “Queues” tab.
  2. Start Tomcat and deploy the application. You should see three listeners listening to “firstQueue”.

ActiveMQTomcatSpringListener

3. Click on “Send To”  in “Operations” column. Write anything of your choice as the body of the message.

TomcatSpringActiveMQSendQueue

4. Click on “Send”. In eclipse console, you will see the message you typed.

Received Message : This message is listened by Spring...
Sending message to Other Queue

5. Refresh the “Queues” tab in ActiveMQ and click on “secondQueue” and then the first message.

6. You should see each letter of the sentence you typed with their number of occurrences.

TomcatActiveMQSpringEndResult

 

 

Create Durable Topic Subscriber for ActiveMQ

In this tutorial, we are going to learn how to receive messages from ActiveMQ as a durable message subscriber of Topic.

Example Scenario:

We need to get different types of messages from ActiveMQ topics using a standalone java application as durable message subscriber for Topics.


Project artifacts:

1. Technologies used –

a. Eclipse IDE for Java EE developer 4.5.0.20150621-1200 (Mars)
b. Preinstalled Maven with Eclipse Mars
c. Java 1.8
d. ActiveMQ 5.12.0

2. Eclipse project folder structure:

DurableTopicSubscriberProj

3. A simple Maven Project

Create a simple maven project skipping archetypes.

Simple Maven Project

4. Maven Dependencies

Paste the below content to pom.xml. It has Active MQ client dependency and plugin to execute our class (call main method).

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.javanbeyond</groupId>
	<artifactId>ActivemqTopicClientDurable</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<dependencies>
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-client</artifactId>
			<version>5.12.0</version>
		</dependency>
	</dependencies>
	<build>
		<plugins>
			<plugin>
				<groupId>org.codehaus.mojo</groupId>
				<artifactId>exec-maven-plugin</artifactId>
				<version>1.1.1</version>
				<executions>
					<execution>
						<phase>test</phase>
						<goals>
							<goal>java</goal>
						</goals>
						<configuration>
							<mainClass>com.javanbeyond.TopicSubscriber</mainClass>
							<arguments>
								<argument>localhost</argument>
								<argument>61616</argument>
							</arguments>
						</configuration>
					</execution>
				</executions>
			</plugin>
		</plugins>
	</build>
</project>

5. POJO

Serializable class object to be received from Topics.

file:MyMessage.java

package com.javanbeyond;

import java.io.Serializable;

public class MyMessage implements Serializable {
  /**
   
   */
  private static final long serialVersionUID = 1L;
  int stockPrice;
  String stockName;
  String stockCompany;

  public int getStockPrice() {
    return stockPrice;
  }

  public void setStockPrice(int stockPrice) {
    this.stockPrice = stockPrice;
  }

  public String getStockName() {
    return stockName;
  }

  public void setStockName(String stockName) {
    this.stockName = stockName;
  }

  public String getStockCompany() {
    return stockCompany;
  }

  public void setStockCompany(String stockCompany) {
    this.stockCompany = stockCompany;
  }

  public String toString() {
    return stockName + " ; " + stockCompany + " ; " + stockPrice;
  }

}

6. Subscriber class

This class creates three subscribers to the topic with different subscription name.
In other words, we have three durable subscribers for the topic.

file:TopicSubscriber.java

package com.javanbeyond;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

public class TopicSubscriber implements MessageListener {
  public static Boolean TRANSACTIONAL = false;
  public static String TOPIC_NAME = "firstTopic";

  public static void main(String[] argsthrows JMSException {
    String host = args[0];
    String port = args[1];
    // Creating Factory for connection
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
        "tcp://" + host + ":" + port);
    Connection connection = factory.createConnection();
    // Setting unique client id for durable subscriber
    connection.setClientID("MyListener");
    connection.start();
    // Create 3 new consumers for the topic
    for (int i = 1; i <= 3; i++) {
      Session session = connection.createSession(TRANSACTIONAL,
          Session.AUTO_ACKNOWLEDGE);
      Topic destination = session.createTopic(TOPIC_NAME);
      MessageConsumer consumer = session
          .createDurableSubscriber(destination, "Listener" + i);
      // Setting the listener class whose onMessage method will be called
      // with message as argument
      consumer.setMessageListener(new TopicSubscriber());
    }
  }

  public void onMessage(Message msg) {
    try {

      if (msg instanceof TextMessage) {
        TextMessage text = (TextMessagemsg;
        System.out.println(" - Consuming text msg: " + text.getText());
      else if (msg instanceof ObjectMessage) {
        ObjectMessage objmsg = (ObjectMessagemsg;
        Object obj = objmsg.getObject();
        System.out.println(" - Consuming object msg: " + obj);
      else {
        System.out.println(
            " - Unrecognized Message type " + msg.getClass());
      }
    catch (JMSException e) {
      e.printStackTrace();
    }
  }
}

Example Execution:

  1. Start ActiveMQ and  and build the project with “maven clean and install”. All three listeners starts listening to the topic “firstTopic”.
  2. Login to ActiveMQ admin (“http://localhost:8161/admin/”) by typing user id /password as admin/admin. Click on “Topic” tab.

DurableSubscriptionActiveMQ

3. Click on “firstTopic” and fill in any Text in the body. Click on “Send”.

DurableSubscriberMessage

4. Your eclipse console should show the message three times as there are three subscribers and each got its message.

 - Consuming text msg: Hi, This message is for Durable Subscribers.
 - Consuming text msg: Hi, This message is for Durable Subscribers.
 - Consuming text msg: Hi, This message is for Durable Subscribers.

 

Create non-durable Topic Subscriber for ActiveMQ

In this tutorial, we are going to learn how to consume messages from ActiveMQ topic as a non-durable message subscriber.

Example Scenario:

We need to get different types of messages from ActiveMQ topics using a standalone java application as non-durable message subscriber.


Project artifacts:

1. Technologies used –

a. Eclipse IDE for Java EE developer 4.5.0.20150621-1200 (Mars)
b. Preinstalled Maven with Eclipse Mars
c. Java 1.8
d. ActiveMQ 5.12.0

2. Eclipse project folder structure:

NonDurable ActiveMQ subscriber

3. A simple Maven Project

Create a simple maven project skipping archetypes.

Simple Maven Project

4. Maven Dependencies

Paste the below content to pom.xml. It has Active MQ client dependency and plugin to execute our class (call main method).

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.javanbeyond</groupId>
	<artifactId>ActivemqTopicClientNonDurable</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<dependencies>
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-client</artifactId>
			<version>5.12.0</version>
		</dependency>
	</dependencies>
	<build>
		<plugins>
			<plugin>
				<groupId>org.codehaus.mojo</groupId>
				<artifactId>exec-maven-plugin</artifactId>
				<version>1.1.1</version>
				<executions>
					<execution>
						<phase>test</phase>
						<goals>
							<goal>java</goal>
						</goals>
						<configuration>
							<mainClass>com.javanbeyond.TopicListenerInitiator</mainClass>
							<arguments>
								<argument>localhost</argument>
								<argument>61616</argument>
							</arguments>
						</configuration>
					</execution>
				</executions>
			</plugin>
		</plugins>
	</build>
</project>

5. POJO

Serializable class object to be received from Topics.

file:MyMessage.java

package com.javanbeyond;

import java.io.Serializable;

public class MyMessage implements Serializable {
  /**
   
   */
  private static final long serialVersionUID = 1L;
  int stockPrice;
  String stockName;
  String stockCompany;

  public int getStockPrice() {
    return stockPrice;
  }

  public void setStockPrice(int stockPrice) {
    this.stockPrice = stockPrice;
  }

  public String getStockName() {
    return stockName;
  }

  public void setStockName(String stockName) {
    this.stockName = stockName;
  }

  public String getStockCompany() {
    return stockCompany;
  }

  public void setStockCompany(String stockCompany) {
    this.stockCompany = stockCompany;
  }

  public String toString() {
    return stockName + " ; " + stockCompany + " ; " + stockPrice;
  }

}

6. Main method class

This class creates three different threads for each listener to get messages from the same topic “firstTopic”.
In other words, we have three subscribers for the topic.

file:TopicListenerInitiator.java

package com.javanbeyond;

import javax.jms.Connection;
import javax.jms.JMSException;

import org.apache.activemq.ActiveMQConnectionFactory;

public class TopicListenerInitiator {

  public static void main(String[] argsthrows JMSException {
    String host = args[0];
    String port = args[1];
    // Creating Factory for connection
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
        "tcp://" + host + ":" + port);
    Connection connection = factory.createConnection();
    connection.start();
    // Create 3 new threads as consumers for the topic and start listening
    for (int i = 1; i <= 3; i++) {
      Thread listener = new Thread(
          new TopicListener(connection, "Listener" + i));
      listener.start();
    }
  }
}

7. The actual listener class

This is the class which listens or subscribes to the topic and wait for any messages to arrive at the topic. Each subscriber creates its own session and consumer, though connection remains same and being shared by them.

file:TopicListener.java

package com.javanbeyond;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;

// Listener class whose objects listen to the topic
public class TopicListener implements Runnable {
  public static Boolean TRANSACTIONAL = false;
  public static String TOPIC_NAME = "firstTopic";

  Connection connection = null;
  String listener = null;

  public TopicListener(Connection connection, String listener) {
    this.connection = connection;
    this.listener = listener;
  }

  public void run() {
    try {
      // in this session, client would send acknowledgement to broker at
      // the
      // end of sending all the messages.
      Session session = this.connection.createSession(TRANSACTIONAL,
          Session.AUTO_ACKNOWLEDGE);
      Destination destination = session.createTopic(TOPIC_NAME);
      MessageConsumer consumer = session.createConsumer(destination);
      // Its an infinite loop. Read till all the messages are consumed.
      System.out.println(this.listener + " listening...");
      while (true) {
        Message msg = consumer.receive();
        if (msg instanceof TextMessage) {
          TextMessage text = (TextMessagemsg;
          System.out.println(this.listener + " - Consuming text msg: "
              + text.getText());
        else if (msg instanceof ObjectMessage) {
          ObjectMessage objmsg = (ObjectMessagemsg;
          Object obj = objmsg.getObject();
          System.out.println(
              this.listener + " - Consuming object msg: " + obj);
        else {
          System.out.println(this.listener
              " - Unrecognized Message type " + msg.getClass());
        }

      }
    catch (JMSException e) {
      e.printStackTrace();
    }

  }
}

Example Execution:

  1. Start ActiveMQ and build the project with “maven clean and install”. All three listeners starts listening to the topic. The sequence of print in your case may be different from mine.
    1. Listener1 listening...
      Listener3 listening...
      Listener2 listening...
      

2.  Login to ActiveMQ admin (“http://locahost:8161/admin/”) by typing user id /password as admin/admin. Click on “Topic” tab.

ActiveMQ Topic subscriber

3. Click on “Send To ” at the right most column of topic “firstTopic” or click on the topic “firstTopic”.

4. Fill in the words “Hello World !!!” or anything of your choice.

ActiveMQAdminConsoleMessages

5. The moment you press “Send”, you should see the messages being consumed by the three listeners on your Eclipse console.

Listener1 - Consuming text msg: Hello World !!!
Listener3 - Consuming text msg: Hello World !!!
Listener2 - Consuming text msg: Hello World !!!

 

 

Receiving Messages from ActiveMQ Queues

In this tutorial we are going to learn how to browse through and receive messages from ActiveMQ queues.

Example Scenario:

In the post “Sending Messages To ActiveMQ“, we sent few messages to ActiveMQ. In this tutorial, we are going to browse those messages and afterwards, consume them from ActiveMQ.


Project artifacts:

1. Technologies used –

a. Eclipse IDE for Java EE developer 4.5.0.20150621-1200 (Mars)
b. Preinstalled Maven with Eclipse Mars
c. Java 1.8
d. ActiveMQ 5.13.0

Jars used – 

In case you are not using maven, please find below the list of jars used in this java application:

activemq-client-5.13.0.jar
slf4j-api-1.7.13.jar
geronimo-jms_1.1_spec-1.1.1.jar
hawtbuf-1.11.jar
geronimo-j2ee-management_1.1_spec-1.0.1.jar

2. Eclipse project folder structure:

activemqreceivemessageseclipseproject

 

3. A simple Maven Project

Create a simple maven project skipping archetypes.

Simple Maven Project

4. Maven Dependencies

file: pom.xml. Maven unit of work to declare project dependencies, compile code and call main method of java JMS client. It has Active MQ client dependency and plugin to execute our class.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.javanbeyond</groupId>
    <artifactId>QueueMessageReceiver</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-client</artifactId>
            <version>5.13.0</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>exec-maven-plugin</artifactId>
                <version>1.1.1</version>
                <executions>
                    <execution>
                        <phase>test</phase>
                        <goals>
                            <goal>java</goal>
                        </goals>
                        <configuration>
                            <mainClass>com.javanbeyond.QueueMessageReceiver</mainClass>
                            <arguments>
                                <argument>localhost</argument>
                                <argument>61616</argument>
                            </arguments>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

 

5. POJO

file: MyMessage.java. A Serializable POJO class which object messages will be deserialized to after reading from queues.

package com.javanbeyond;

import java.io.Serializable;

public class MyMessage implements Serializable {
    /**
     
     */
    private static final long serialVersionUID = 1L;
    int stockPrice;
    String stockName;
    String stockCompany;

    public int getStockPrice() {
        return stockPrice;
    }

    public void setStockPrice(int stockPrice) {
        this.stockPrice = stockPrice;
    }

    public String getStockName() {
        return stockName;
    }

    public void setStockName(String stockName) {
        this.stockName = stockName;
    }

    public String getStockCompany() {
        return stockCompany;
    }

    public void setStockCompany(String stockCompany) {
        this.stockCompany = stockCompany;
    }
    public String toString() {
        return stockName + " ; " + stockCompany + " ; " + stockPrice;
    }

}

 

6. ActiveMQ Message Consumer

file: QueueMessageReceiver.java. This is the standalone Java client program used to browse and receive messages from Queues. The method browseMessages just goes over all the messages on the queues without consuming them. It either prints the messages or download and save in local directory as files. Browse operation doesn’t remove messages from queues. The method readMessages is the method which consumes the messages from the queue “FirstQueue”, if it has any and keep listening to it for 10 seconds before the program stops. Please take cognizance of how bytes of Stream and Bytes messages are written to local directory ReceivedFiles as files. The method readAndPrintMessage segregates the message based on type using instanceof operator.

package com.javanbeyond;

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.Enumeration;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;

public class QueueMessageReceiver {
    public static Boolean TRANSACTIONAL = false;
    ActiveMQConnectionFactory factory = null;
    Connection connection = null;
    Session session = null;
    Queue queue = null;
    MessageConsumer consumer = null;

    private void init(String host, int portthrows JMSException {
        // Creating Factory for connection
        factory = new ActiveMQConnectionFactory("tcp://" + host + ":" + port);
        // This is to surpass security where you tell ActiveMQ that this package 
        // classes will be exchanged as ObjectMessages.
        System.setProperty("org.apache.activemq.SERIALIZABLE_PACKAGES",
                "com.javanbeyond");
        connection = factory.createConnection();
        connection.start();
        // In this session, client would send acknowledgement to broker at the
        // end of sending all the messages.
        session = connection.createSession(TRANSACTIONAL,
                Session.AUTO_ACKNOWLEDGE);
        // If the queue is not there, it will be created. If it is there, it
        // will be used.
        queue = new ActiveMQQueue("FirstQueue");
    }

    public static void main(String[] argsthrows NumberFormatException, JMSException {
        QueueMessageReceiver queueMessageReceiver = new QueueMessageReceiver();
        // Creating connection and initializing before getting messages from
        // queues.
        queueMessageReceiver.init(args[0], Integer.parseInt(args[1]));
        // This method with only browse the messages without it being
        // removed
        // from queue
        System.out
                .println("BROWSING STARTS ===================================");
        queueMessageReceiver.browseMessages();
        System.out.println("BROWSING ENDS ===================================");
        // This method will actually read the messages and remove them from
        // queue
        System.out
                .println("READING STARTS ===================================");
        queueMessageReceiver.readMessages();
        System.out.println("READING ENDS ===================================");
        System.out.println("All Messages received.");
        queueMessageReceiver.consumer.close();
        queueMessageReceiver.session.close();
        queueMessageReceiver.connection.close();

    }

    public void readMessages() throws JMSException {
        consumer = session.createConsumer(queue);
        // It will wait for 10 seconds before all the messages are read.
        ExecutorService executor = Executors.newSingleThreadExecutor();
        try {
            executor.invokeAll(Arrays.asList(new Callable<String>() {
                public String call() {
                    int count = 0;
                    while (true) {
                        Message msg;
                        try {
                            msg = consumer.receive();
                            System.out.println(
                                    "Reading Message No. " (++count));
                            readAndPrintMessage(msg);
                        catch (JMSException e) {
                            break;
                        }
                    }
                    return null;
                }
            })10, TimeUnit.SECONDS);
        catch (InterruptedException e) {
            e.printStackTrace();
        // Timeout of 10 seconds.
        executor.shutdown();
    }

    // Browse through every messages
    public void browseMessages() throws JMSException {
        QueueBrowser browser = session.createBrowser(queue);
        Enumeration<?> enumeration = browser.getEnumeration();
        int count = 0;
        while (enumeration.hasMoreElements()) {
            System.out.println("Browsing Message No. " (++count));
            Object obj = enumeration.nextElement();
            readAndPrintMessage(obj);
        }
    }

    void readAndPrintMessage(Object objthrows JMSException {
        if (obj instanceof TextMessage) {
            TextMessage text = (TextMessageobj;
            System.out.println("Starting text msg: " + text.getText());
            readAndPrintAllProperties(text);
        else if (obj instanceof ObjectMessage) {
            ObjectMessage objmsg = (ObjectMessageobj;
            System.out.println("Starting object msg: ");
            System.out.println("Reading integer property from ObjectMessage "
                    + objmsg.getIntProperty("idSquare"));
            Object object = objmsg.getObject();
            MyMessage myMessage = (MyMessageobject;
            System.out.println(myMessage.toString());
        else if (obj instanceof MapMessage) {
            MapMessage mapmsg = (MapMessageobj;
            System.out.println("Starting map msg: ");
            Enumeration<String> mapNames = mapmsg.getMapNames();
            while (mapNames.hasMoreElements()) {
                String mapName = mapNames.nextElement();
                System.out.println("Map Name:" + mapName + "; Map Value:"
                        + mapmsg.getObject(mapName).toString());
            }
        else if (obj instanceof StreamMessage) {
            StreamMessage strmmsg = (StreamMessageobj;
            // The sequence of reading messages should match the sequence of
            // sending them
            System.out.println("Starting stream msg: ");
            System.out.println(
                    "Stream Message reading object : " + strmmsg.readObject());
            System.out.println(
                    "Stream Message reading String : " + strmmsg.readString());
            StringBuffer sb = new StringBuffer();
            System.out.println("Stream Message reading bytes : ");
            byte[] buffer = new byte[4096];
            int c;
            while ((c = strmmsg.readBytes(buffer)) 0) {
                sb.append(new String(buffer, 0, c));
            }
            // Print first 1000 characters of the message due to its hugeness
            System.out.println(sb.toString().substring(01000" ...");
            String fileNameExtension = strmmsg
                    .getStringProperty("fileNameExtension");
            // If file name is not provided, the JMS id will be used as file
            // name
            fileNameExtension = fileNameExtension == null
                    ? strmmsg.getJMSMessageID() ".txt" : fileNameExtension;
            File file = new File("ReceivedFiles/" + fileNameExtension);
            try {
                Files.write(file.toPath(), sb.toString().getBytes(),
                        StandardOpenOption.CREATE);
            catch (IOException e) {
                e.printStackTrace();
            }
            readAndPrintAllProperties(strmmsg);
        else if (obj instanceof BytesMessage) {
            BytesMessage bytesmsg = (BytesMessageobj;
            System.out.println("Starting Bytes msg: ");
            String fileName = bytesmsg.getStringProperty("fileName");
            // If file name is not provided, the JMS id will be used as file
            // name
            File file = new File("ReceivedFiles/"
                    (fileName == null || fileName.isEmpty()
                            ? bytesmsg.getJMSMessageID() : fileName)
                    "." + bytesmsg.getStringProperty("fileExtension"));
            System.out.println("Writing file " + file.getName() " to "
                    + file.getAbsolutePath());
            OutputStream os = null;
            try {
                os = new FileOutputStream(file, true);
            catch (FileNotFoundException e) {
                e.printStackTrace();
            }
            BufferedOutputStream bos = new BufferedOutputStream(os);
            byte[] buffer = new byte[4096];
            int c;
            try {
                while ((c = bytesmsg.readBytes(buffer)) 0) {
                    bos.write(buffer, 0, c);
                }
                bos.flush();
                bos.close();
            catch (IOException e1) {
                e1.printStackTrace();
            }
        else {
            System.out.println("Unrecognized Message type " + obj.getClass());
        }
    }

    // Printing all the properties of messages
    void readAndPrintAllProperties(Message msg) {
        Enumeration<?> propEnum = null;
        try {
            propEnum = msg.getPropertyNames();
        catch (JMSException e) {
            e.printStackTrace();
        }
        while (propEnum.hasMoreElements()) {
            String propName = propEnum.nextElement().toString();
            try {
                System.out.println(
                        "Property Name : " + propName + "; Property Value : "
                                + msg.getObjectProperty(propName).toString());
            catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

}

Example Execution:

  1. Start ActiveMQ and run the program created here. The queue should have some messages now.

activemqreadingmessagestoberead

 

2. Now, run this program and on eclipse console you should see messages being browsed first and then read.

3. The browsing operation doesn’t change the state of the queues. It just reads the content of the queues and display. On the other hand, consume operation actually moves the messages from ActiveMQ to client program. The queue count should be “0” in ActiveMQ now and the program will wait for 10 seconds before it completes.


4. You should see the read messages converted to files in your ReceivedFiles directory of eclipse project.

activemqmessagesfilesdownloaded

 

Sending Messages To ActiveMQ Queues

In this tutorial, we will learn how to send different types of messages to ActiveMQ queues.

Example Scenario:

We need to read java String and different types of files and convert them to JMS messages and send to ActiveMQ queues to be received by some other application.


Project artifacts:

 1. Technologies used –

a. Eclipse IDE for Java EE developer 4.5.0.20150621-1200 (Mars)
b. Preinstalled Maven with Eclipse Mars
c. Java 1.8
d. ActiveMQ 5.13.0

Jars used – 

In case you are not using maven, please find below the list of jars used in this java application:

activemq-client-5.13.0.jar
slf4j-api-1.7.13.jar
geronimo-jms_1.1_spec-1.1.1.jar
hawtbuf-1.11.jar
geronimo-j2ee-management_1.1_spec-1.0.1.jar

2. Eclipse project folder structure:

3. A simple Maven Project

Create a simple maven project skipping archetypes.

Simple Maven Project

4. Maven Dependencies

file: pom.xml. Maven unit of work to declare project dependencies, compile code and call main method of java JMS client. It has Active MQ client dependency and plugin to execute our class.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.javanbeyond</groupId>
    <artifactId>QueueMessageSender</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-client</artifactId>
            <version>5.13.0</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>exec-maven-plugin</artifactId>
                <version>1.1.1</version>
                <executions>
                    <execution>
                        <phase>test</phase>
                        <goals>
                            <goal>java</goal>
                        </goals>
                        <configuration>
                            <mainClass>com.javanbeyond.QueueMessageSender</mainClass>
                            <arguments>
                                <argument>localhost</argument>
                                <argument>61616</argument>
                            </arguments>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

 

5. POJO

file: MyMessage.java. A Serializable POJO class whose objects will be sent to Queues.

package com.javanbeyond;

import java.io.Serializable;

public class MyMessage implements Serializable {
    /**
     
     */
    private static final long serialVersionUID = 1L;
    int stockPrice;
    String stockName;
    String stockCompany;

    public int getStockPrice() {
        return stockPrice;
    }

    public void setStockPrice(int stockPrice) {
        this.stockPrice = stockPrice;
    }

    public String getStockName() {
        return stockName;
    }

    public void setStockName(String stockName) {
        this.stockName = stockName;
    }

    public String getStockCompany() {
        return stockCompany;
    }

    public void setStockCompany(String stockCompany) {
        this.stockCompany = stockCompany;
    }

    public String toString() {
        return stockName + " ; " + stockCompany + " ; " + stockPrice;
    }

}

 

6. ActiveMQ Message Producer

file:QueueMessageSender.java. This is the java client program for sending messages to Queues. JMS supports 5 different types of messages, namely TextMessages, MapMessages, BytesMessages, StreamMessages and ObjectMessages. The below client depicts all of the messages in action in a client role.

  1. createAndSendTextMessage : Text messages are the simplest of the JMS messages which takes string as a body of the message. This method creates 100 text messages and sets integer, float and String property of the message before sending it to ActiveMQ.
  2. createAndSendObjectMessage : Object message takes any java serializable object as message body. This method creates 100 object messages of type MyMessage and sets integer property of the message. Please note that the class MyMessage is serializable.
  3. createAndSendMapMessage : Map messages consist of key value pairs where key is generally String while value can be any of the java primitive types. This method creates 100 map messages of different types which includes float, String, Integer and Double. You can even set Byte, Long, Char, Boolean and Object values in Map message.
  4. createAndSendStreamMessage : A stream message is a stream of java primitives that should be read sequentially. In this method, a stream message is created and a file is read and written to Stream message as bytes.
  5. createAndSendBytesMessage : Bytes messages are used to read a stream of uninterpreted bytes. Two different file types, namely pdf and jpg are read in this method and sent to ActiveMQ as messages.

package com.javanbeyond;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import javax.jms.*;

public class QueueMessageSender {

    ActiveMQConnectionFactory factory = null;
    Connection connection = null;
    Session session = null;
    Destination dest = null;
    MessageProducer producer = null;
    public static Boolean TRANSACTIONAL = false;

    private void init(String host, int port)
            throws JMSException {
        // Creating Factory for connection
        factory = new ActiveMQConnectionFactory(
                "tcp://" + host + ":" + port);
        connection = factory.createConnection();
        connection.start();
        // in this session, client would send acknowledgement to broker at
        // the
        // end of sending all the messages.
        session = connection.createSession(TRANSACTIONAL,
                Session.AUTO_ACKNOWLEDGE);
        // If the queue is not there, it will be created. if it is there, it
        // will be used.
        dest = new ActiveMQQueue("FirstQueue");
        producer = session.createProducer(dest);
        // This is test run, so messages may be lost if ActiveMQ is
        // restarted
        // with messages in it.
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

    }

    public static void main(String[] argsthrows JMSException {
        QueueMessageSender queueMessageSender = new QueueMessageSender();
        // Prepares connections and creates a queue, if non exists.
        queueMessageSender.init(args[0], Integer.parseInt(args[1]));
        // Create a Text Message
         queueMessageSender.createAndSendTextMessage();
        // Create an Object Message
         queueMessageSender.createAndSendObjectMessage();
        // Create a Map Message
         queueMessageSender.createAndSendMapMessage();
        // Create a Stream Message
         queueMessageSender.createAndSendStreamMessage();
        // Create a Bytes Message
         queueMessageSender.createAndSendBytesMessage();
        System.out.println("All Messages sent.");
        queueMessageSender.producer.close();
        queueMessageSender.session.close();
        queueMessageSender.connection.close();

    }

    // Create 100 text messages with integer, float and string message headers
    // and send it to Queues
    public void createAndSendTextMessage() throws JMSException {
        for (int i = 1; i <= 100; i++) {
            // Adding text to the body
            TextMessage msg = session.createTextMessage("Message " + i);
            msg.setJMSType("TEXT");
            // Setting integer property
            msg.setIntProperty("id", i);
            // Setting float property
            msg.setFloatProperty("idSqrt"(floatMath.sqrt(i));
            // Every even number
            if (i % == 0) {
                msg.setStringProperty("for " + i, "john");
            else {
                msg.setStringProperty("for " + i, "doe");
            }
            producer.send(msg);
        }
        System.out.println("Text Messages sent.");
    }

    // Create 100 object messages with stock price, name and company name in it.
    public void createAndSendObjectMessage() throws JMSException {
        for (int i = 1; i <= 100; i++) {
            MyMessage mymessage = new MyMessage();
            mymessage.setStockPrice(i);
            mymessage.setStockName("Stock " + i);
            mymessage.setStockCompany("Company " + i);
            // Adding Object to the body
            ObjectMessage msg = session.createObjectMessage(mymessage);
            msg.setJMSType("OBJECT");
            // Adding an int property
            msg.setIntProperty("idSquare"(intMath.pow(i, 2));
            producer.send(msg);
        }
        System.out.println("Object Messages sent.");
    }

    // Create 100 Map messages
    public void createAndSendMapMessage() throws JMSException {
        for (int i = 1; i <= 100; i++) {
            MapMessage mapMsg = session.createMapMessage();
            // Set Map Values with Keys
            mapMsg.setJMSType("MAP");
            mapMsg.setInt("MessageNum", i);
            mapMsg.setDouble("Log", Math.log10(i));
            mapMsg.setDouble("Square Root", Math.sqrt(i));
            mapMsg.setString("Cube Root", String.valueOf(Math.cbrt(i)));
            mapMsg.setString("Exponential", String.valueOf(Math.exp(i)));
            mapMsg.setInt("Square"(intMath.pow(i, 2));
            mapMsg.setInt("Cube"(intMath.pow(i, 3));
            mapMsg.setFloat("tan\u03B8"(floatMath.tan(i));
            mapMsg.setFloat("sin\u03B8"(floatMath.sin(i));
            mapMsg.setFloat("cos\u03B8"(floatMath.cos(i));
            producer.send(mapMsg);
        }
        System.out.println("Map Messages sent.");

    }

    public void createAndSendStreamMessage() throws JMSException {
        StreamMessage streamMsg = session.createStreamMessage();
        streamMsg.setJMSType("STREAM");
        // Setting integer property
        streamMsg.setIntProperty("id"1);
        // Setting object
        streamMsg.writeObject(new Integer(2));
        // Setting String. Please note that the stream message must be read
        // at consumer side with the same order as it is written in producer
        // side.
        streamMsg.writeString("Stream String");
        File streamFile = new File("src/main/resources/ForStreamMessage.txt");
        streamMsg.setStringProperty("fileNameExtension", streamFile.getName());
        streamMsg.setLongProperty("fileSize", streamFile.length());
        try {
            streamMsg.writeBytes(Files.readAllBytes(streamFile.toPath()));
        catch (IOException e) {
            e.printStackTrace();
        }
        producer.send(streamMsg);
        System.out.println("Stream Messages sent.");
    }

    // Reads a file and creates a byte message from its content.

    public void createAndSendBytesMessage() throws JMSException {
        BytesMessage bytesMsgPdf = session.createBytesMessage();
        bytesMsgPdf.setJMSType("BYTES");
        // Create message from pdf file
        try {
            bytesMsgPdf.writeBytes(Files.readAllBytes(
                    new File("src/main/resources/pdf-sample.pdf").toPath()));
            bytesMsgPdf.setStringProperty("fileExtension""pdf");
            bytesMsgPdf.setStringProperty("fileName""SampleFileName");
        catch (IOException e) {
            e.printStackTrace();
        }
        producer.send(bytesMsgPdf);
        // Create message from jpg file
        BytesMessage bytesMsgJpg = session.createBytesMessage();
        bytesMsgJpg.setJMSType("BYTES");
        try {
            bytesMsgJpg.writeBytes(Files.readAllBytes(
                    new File("src/main/resources/Sample.jpg").toPath()));
            bytesMsgJpg.setStringProperty("fileExtension""jpg");
            bytesMsgJpg.setStringProperty("fileName""SampleJpg");
        catch (IOException e) {
            e.printStackTrace();
        }
        producer.send(bytesMsgJpg);
        System.out.println("Bytes Message sent.");
    }
}

Example Execution:

  1. Start ActiveMQ and run the client program by executing “maven clean and install”.
  2. Once it finishes, open your favorite browser and type “http://localhost:8161/admin”
  3. Provide username as “admin” and password as “admin”, if asked for.
  4. Click on Queue Tab and you should see a Queue named “FirstQueue” created with 303 messages in it. Click on “FirstQueue” queue and you will see all the different types of messages with their JMS message ids and types.

Back to Top