Create Durable Topic Subscriber for ActiveMQ

In this tutorial, we are going to learn how to receive messages from ActiveMQ as a durable message subscriber of Topic.

Example Scenario:

We need to get different types of messages from ActiveMQ topics using a standalone java application as durable message subscriber for Topics.


Project artifacts:

1. Technologies used –

a. Eclipse IDE for Java EE developer 4.5.0.20150621-1200 (Mars)
b. Preinstalled Maven with Eclipse Mars
c. Java 1.8
d. ActiveMQ 5.12.0

2. Eclipse project folder structure:

DurableTopicSubscriberProj

3. A simple Maven Project

Create a simple maven project skipping archetypes.

Simple Maven Project

4. Maven Dependencies

Paste the below content to pom.xml. It has Active MQ client dependency and plugin to execute our class (call main method).

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.javanbeyond</groupId>
	<artifactId>ActivemqTopicClientDurable</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<dependencies>
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-client</artifactId>
			<version>5.12.0</version>
		</dependency>
	</dependencies>
	<build>
		<plugins>
			<plugin>
				<groupId>org.codehaus.mojo</groupId>
				<artifactId>exec-maven-plugin</artifactId>
				<version>1.1.1</version>
				<executions>
					<execution>
						<phase>test</phase>
						<goals>
							<goal>java</goal>
						</goals>
						<configuration>
							<mainClass>com.javanbeyond.TopicSubscriber</mainClass>
							<arguments>
								<argument>localhost</argument>
								<argument>61616</argument>
							</arguments>
						</configuration>
					</execution>
				</executions>
			</plugin>
		</plugins>
	</build>
</project>

5. POJO

Serializable class object to be received from Topics.

file:MyMessage.java

package com.javanbeyond;

import java.io.Serializable;

public class MyMessage implements Serializable {
  /**
   
   */
  private static final long serialVersionUID = 1L;
  int stockPrice;
  String stockName;
  String stockCompany;

  public int getStockPrice() {
    return stockPrice;
  }

  public void setStockPrice(int stockPrice) {
    this.stockPrice = stockPrice;
  }

  public String getStockName() {
    return stockName;
  }

  public void setStockName(String stockName) {
    this.stockName = stockName;
  }

  public String getStockCompany() {
    return stockCompany;
  }

  public void setStockCompany(String stockCompany) {
    this.stockCompany = stockCompany;
  }

  public String toString() {
    return stockName + " ; " + stockCompany + " ; " + stockPrice;
  }

}

6. Subscriber class

This class creates three subscribers to the topic with different subscription name.
In other words, we have three durable subscribers for the topic.

file:TopicSubscriber.java

package com.javanbeyond;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

public class TopicSubscriber implements MessageListener {
  public static Boolean TRANSACTIONAL = false;
  public static String TOPIC_NAME = "firstTopic";

  public static void main(String[] argsthrows JMSException {
    String host = args[0];
    String port = args[1];
    // Creating Factory for connection
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
        "tcp://" + host + ":" + port);
    Connection connection = factory.createConnection();
    // Setting unique client id for durable subscriber
    connection.setClientID("MyListener");
    connection.start();
    // Create 3 new consumers for the topic
    for (int i = 1; i <= 3; i++) {
      Session session = connection.createSession(TRANSACTIONAL,
          Session.AUTO_ACKNOWLEDGE);
      Topic destination = session.createTopic(TOPIC_NAME);
      MessageConsumer consumer = session
          .createDurableSubscriber(destination, "Listener" + i);
      // Setting the listener class whose onMessage method will be called
      // with message as argument
      consumer.setMessageListener(new TopicSubscriber());
    }
  }

  public void onMessage(Message msg) {
    try {

      if (msg instanceof TextMessage) {
        TextMessage text = (TextMessagemsg;
        System.out.println(" - Consuming text msg: " + text.getText());
      else if (msg instanceof ObjectMessage) {
        ObjectMessage objmsg = (ObjectMessagemsg;
        Object obj = objmsg.getObject();
        System.out.println(" - Consuming object msg: " + obj);
      else {
        System.out.println(
            " - Unrecognized Message type " + msg.getClass());
      }
    catch (JMSException e) {
      e.printStackTrace();
    }
  }
}

Example Execution:

  1. Start ActiveMQ and  and build the project with “maven clean and install”. All three listeners starts listening to the topic “firstTopic”.
  2. Login to ActiveMQ admin (“http://localhost:8161/admin/”) by typing user id /password as admin/admin. Click on “Topic” tab.

DurableSubscriptionActiveMQ

3. Click on “firstTopic” and fill in any Text in the body. Click on “Send”.

DurableSubscriberMessage

4. Your eclipse console should show the message three times as there are three subscribers and each got its message.

 - Consuming text msg: Hi, This message is for Durable Subscribers.
 - Consuming text msg: Hi, This message is for Durable Subscribers.
 - Consuming text msg: Hi, This message is for Durable Subscribers.

 

Receiving Messages from ActiveMQ Queues

In this tutorial we are going to learn how to browse through and receive messages from ActiveMQ queues.

Example Scenario:

In the post “Sending Messages To ActiveMQ“, we sent few messages to ActiveMQ. In this tutorial, we are going to browse those messages and afterwards, consume them from ActiveMQ.


Project artifacts:

1. Technologies used –

a. Eclipse IDE for Java EE developer 4.5.0.20150621-1200 (Mars)
b. Preinstalled Maven with Eclipse Mars
c. Java 1.8
d. ActiveMQ 5.13.0

Jars used – 

In case you are not using maven, please find below the list of jars used in this java application:

activemq-client-5.13.0.jar
slf4j-api-1.7.13.jar
geronimo-jms_1.1_spec-1.1.1.jar
hawtbuf-1.11.jar
geronimo-j2ee-management_1.1_spec-1.0.1.jar

2. Eclipse project folder structure:

activemqreceivemessageseclipseproject

 

3. A simple Maven Project

Create a simple maven project skipping archetypes.

Simple Maven Project

4. Maven Dependencies

file: pom.xml. Maven unit of work to declare project dependencies, compile code and call main method of java JMS client. It has Active MQ client dependency and plugin to execute our class.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.javanbeyond</groupId>
    <artifactId>QueueMessageReceiver</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-client</artifactId>
            <version>5.13.0</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>exec-maven-plugin</artifactId>
                <version>1.1.1</version>
                <executions>
                    <execution>
                        <phase>test</phase>
                        <goals>
                            <goal>java</goal>
                        </goals>
                        <configuration>
                            <mainClass>com.javanbeyond.QueueMessageReceiver</mainClass>
                            <arguments>
                                <argument>localhost</argument>
                                <argument>61616</argument>
                            </arguments>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

 

5. POJO

file: MyMessage.java. A Serializable POJO class which object messages will be deserialized to after reading from queues.

package com.javanbeyond;

import java.io.Serializable;

public class MyMessage implements Serializable {
    /**
     
     */
    private static final long serialVersionUID = 1L;
    int stockPrice;
    String stockName;
    String stockCompany;

    public int getStockPrice() {
        return stockPrice;
    }

    public void setStockPrice(int stockPrice) {
        this.stockPrice = stockPrice;
    }

    public String getStockName() {
        return stockName;
    }

    public void setStockName(String stockName) {
        this.stockName = stockName;
    }

    public String getStockCompany() {
        return stockCompany;
    }

    public void setStockCompany(String stockCompany) {
        this.stockCompany = stockCompany;
    }
    public String toString() {
        return stockName + " ; " + stockCompany + " ; " + stockPrice;
    }

}

 

6. ActiveMQ Message Consumer

file: QueueMessageReceiver.java. This is the standalone Java client program used to browse and receive messages from Queues. The method browseMessages just goes over all the messages on the queues without consuming them. It either prints the messages or download and save in local directory as files. Browse operation doesn’t remove messages from queues. The method readMessages is the method which consumes the messages from the queue “FirstQueue”, if it has any and keep listening to it for 10 seconds before the program stops. Please take cognizance of how bytes of Stream and Bytes messages are written to local directory ReceivedFiles as files. The method readAndPrintMessage segregates the message based on type using instanceof operator.

package com.javanbeyond;

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.Enumeration;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;

public class QueueMessageReceiver {
    public static Boolean TRANSACTIONAL = false;
    ActiveMQConnectionFactory factory = null;
    Connection connection = null;
    Session session = null;
    Queue queue = null;
    MessageConsumer consumer = null;

    private void init(String host, int portthrows JMSException {
        // Creating Factory for connection
        factory = new ActiveMQConnectionFactory("tcp://" + host + ":" + port);
        // This is to surpass security where you tell ActiveMQ that this package 
        // classes will be exchanged as ObjectMessages.
        System.setProperty("org.apache.activemq.SERIALIZABLE_PACKAGES",
                "com.javanbeyond");
        connection = factory.createConnection();
        connection.start();
        // In this session, client would send acknowledgement to broker at the
        // end of sending all the messages.
        session = connection.createSession(TRANSACTIONAL,
                Session.AUTO_ACKNOWLEDGE);
        // If the queue is not there, it will be created. If it is there, it
        // will be used.
        queue = new ActiveMQQueue("FirstQueue");
    }

    public static void main(String[] argsthrows NumberFormatException, JMSException {
        QueueMessageReceiver queueMessageReceiver = new QueueMessageReceiver();
        // Creating connection and initializing before getting messages from
        // queues.
        queueMessageReceiver.init(args[0], Integer.parseInt(args[1]));
        // This method with only browse the messages without it being
        // removed
        // from queue
        System.out
                .println("BROWSING STARTS ===================================");
        queueMessageReceiver.browseMessages();
        System.out.println("BROWSING ENDS ===================================");
        // This method will actually read the messages and remove them from
        // queue
        System.out
                .println("READING STARTS ===================================");
        queueMessageReceiver.readMessages();
        System.out.println("READING ENDS ===================================");
        System.out.println("All Messages received.");
        queueMessageReceiver.consumer.close();
        queueMessageReceiver.session.close();
        queueMessageReceiver.connection.close();

    }

    public void readMessages() throws JMSException {
        consumer = session.createConsumer(queue);
        // It will wait for 10 seconds before all the messages are read.
        ExecutorService executor = Executors.newSingleThreadExecutor();
        try {
            executor.invokeAll(Arrays.asList(new Callable<String>() {
                public String call() {
                    int count = 0;
                    while (true) {
                        Message msg;
                        try {
                            msg = consumer.receive();
                            System.out.println(
                                    "Reading Message No. " (++count));
                            readAndPrintMessage(msg);
                        catch (JMSException e) {
                            break;
                        }
                    }
                    return null;
                }
            })10, TimeUnit.SECONDS);
        catch (InterruptedException e) {
            e.printStackTrace();
        // Timeout of 10 seconds.
        executor.shutdown();
    }

    // Browse through every messages
    public void browseMessages() throws JMSException {
        QueueBrowser browser = session.createBrowser(queue);
        Enumeration<?> enumeration = browser.getEnumeration();
        int count = 0;
        while (enumeration.hasMoreElements()) {
            System.out.println("Browsing Message No. " (++count));
            Object obj = enumeration.nextElement();
            readAndPrintMessage(obj);
        }
    }

    void readAndPrintMessage(Object objthrows JMSException {
        if (obj instanceof TextMessage) {
            TextMessage text = (TextMessageobj;
            System.out.println("Starting text msg: " + text.getText());
            readAndPrintAllProperties(text);
        else if (obj instanceof ObjectMessage) {
            ObjectMessage objmsg = (ObjectMessageobj;
            System.out.println("Starting object msg: ");
            System.out.println("Reading integer property from ObjectMessage "
                    + objmsg.getIntProperty("idSquare"));
            Object object = objmsg.getObject();
            MyMessage myMessage = (MyMessageobject;
            System.out.println(myMessage.toString());
        else if (obj instanceof MapMessage) {
            MapMessage mapmsg = (MapMessageobj;
            System.out.println("Starting map msg: ");
            Enumeration<String> mapNames = mapmsg.getMapNames();
            while (mapNames.hasMoreElements()) {
                String mapName = mapNames.nextElement();
                System.out.println("Map Name:" + mapName + "; Map Value:"
                        + mapmsg.getObject(mapName).toString());
            }
        else if (obj instanceof StreamMessage) {
            StreamMessage strmmsg = (StreamMessageobj;
            // The sequence of reading messages should match the sequence of
            // sending them
            System.out.println("Starting stream msg: ");
            System.out.println(
                    "Stream Message reading object : " + strmmsg.readObject());
            System.out.println(
                    "Stream Message reading String : " + strmmsg.readString());
            StringBuffer sb = new StringBuffer();
            System.out.println("Stream Message reading bytes : ");
            byte[] buffer = new byte[4096];
            int c;
            while ((c = strmmsg.readBytes(buffer)) 0) {
                sb.append(new String(buffer, 0, c));
            }
            // Print first 1000 characters of the message due to its hugeness
            System.out.println(sb.toString().substring(01000" ...");
            String fileNameExtension = strmmsg
                    .getStringProperty("fileNameExtension");
            // If file name is not provided, the JMS id will be used as file
            // name
            fileNameExtension = fileNameExtension == null
                    ? strmmsg.getJMSMessageID() ".txt" : fileNameExtension;
            File file = new File("ReceivedFiles/" + fileNameExtension);
            try {
                Files.write(file.toPath(), sb.toString().getBytes(),
                        StandardOpenOption.CREATE);
            catch (IOException e) {
                e.printStackTrace();
            }
            readAndPrintAllProperties(strmmsg);
        else if (obj instanceof BytesMessage) {
            BytesMessage bytesmsg = (BytesMessageobj;
            System.out.println("Starting Bytes msg: ");
            String fileName = bytesmsg.getStringProperty("fileName");
            // If file name is not provided, the JMS id will be used as file
            // name
            File file = new File("ReceivedFiles/"
                    (fileName == null || fileName.isEmpty()
                            ? bytesmsg.getJMSMessageID() : fileName)
                    "." + bytesmsg.getStringProperty("fileExtension"));
            System.out.println("Writing file " + file.getName() " to "
                    + file.getAbsolutePath());
            OutputStream os = null;
            try {
                os = new FileOutputStream(file, true);
            catch (FileNotFoundException e) {
                e.printStackTrace();
            }
            BufferedOutputStream bos = new BufferedOutputStream(os);
            byte[] buffer = new byte[4096];
            int c;
            try {
                while ((c = bytesmsg.readBytes(buffer)) 0) {
                    bos.write(buffer, 0, c);
                }
                bos.flush();
                bos.close();
            catch (IOException e1) {
                e1.printStackTrace();
            }
        else {
            System.out.println("Unrecognized Message type " + obj.getClass());
        }
    }

    // Printing all the properties of messages
    void readAndPrintAllProperties(Message msg) {
        Enumeration<?> propEnum = null;
        try {
            propEnum = msg.getPropertyNames();
        catch (JMSException e) {
            e.printStackTrace();
        }
        while (propEnum.hasMoreElements()) {
            String propName = propEnum.nextElement().toString();
            try {
                System.out.println(
                        "Property Name : " + propName + "; Property Value : "
                                + msg.getObjectProperty(propName).toString());
            catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

}

Example Execution:

  1. Start ActiveMQ and run the program created here. The queue should have some messages now.

activemqreadingmessagestoberead

 

2. Now, run this program and on eclipse console you should see messages being browsed first and then read.

3. The browsing operation doesn’t change the state of the queues. It just reads the content of the queues and display. On the other hand, consume operation actually moves the messages from ActiveMQ to client program. The queue count should be “0” in ActiveMQ now and the program will wait for 10 seconds before it completes.


4. You should see the read messages converted to files in your ReceivedFiles directory of eclipse project.

activemqmessagesfilesdownloaded

 

Sending Messages To ActiveMQ Queues

In this tutorial, we will learn how to send different types of messages to ActiveMQ queues.

Example Scenario:

We need to read java String and different types of files and convert them to JMS messages and send to ActiveMQ queues to be received by some other application.


Project artifacts:

 1. Technologies used –

a. Eclipse IDE for Java EE developer 4.5.0.20150621-1200 (Mars)
b. Preinstalled Maven with Eclipse Mars
c. Java 1.8
d. ActiveMQ 5.13.0

Jars used – 

In case you are not using maven, please find below the list of jars used in this java application:

activemq-client-5.13.0.jar
slf4j-api-1.7.13.jar
geronimo-jms_1.1_spec-1.1.1.jar
hawtbuf-1.11.jar
geronimo-j2ee-management_1.1_spec-1.0.1.jar

2. Eclipse project folder structure:

3. A simple Maven Project

Create a simple maven project skipping archetypes.

Simple Maven Project

4. Maven Dependencies

file: pom.xml. Maven unit of work to declare project dependencies, compile code and call main method of java JMS client. It has Active MQ client dependency and plugin to execute our class.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.javanbeyond</groupId>
    <artifactId>QueueMessageSender</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-client</artifactId>
            <version>5.13.0</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>exec-maven-plugin</artifactId>
                <version>1.1.1</version>
                <executions>
                    <execution>
                        <phase>test</phase>
                        <goals>
                            <goal>java</goal>
                        </goals>
                        <configuration>
                            <mainClass>com.javanbeyond.QueueMessageSender</mainClass>
                            <arguments>
                                <argument>localhost</argument>
                                <argument>61616</argument>
                            </arguments>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

 

5. POJO

file: MyMessage.java. A Serializable POJO class whose objects will be sent to Queues.

package com.javanbeyond;

import java.io.Serializable;

public class MyMessage implements Serializable {
    /**
     
     */
    private static final long serialVersionUID = 1L;
    int stockPrice;
    String stockName;
    String stockCompany;

    public int getStockPrice() {
        return stockPrice;
    }

    public void setStockPrice(int stockPrice) {
        this.stockPrice = stockPrice;
    }

    public String getStockName() {
        return stockName;
    }

    public void setStockName(String stockName) {
        this.stockName = stockName;
    }

    public String getStockCompany() {
        return stockCompany;
    }

    public void setStockCompany(String stockCompany) {
        this.stockCompany = stockCompany;
    }

    public String toString() {
        return stockName + " ; " + stockCompany + " ; " + stockPrice;
    }

}

 

6. ActiveMQ Message Producer

file:QueueMessageSender.java. This is the java client program for sending messages to Queues. JMS supports 5 different types of messages, namely TextMessages, MapMessages, BytesMessages, StreamMessages and ObjectMessages. The below client depicts all of the messages in action in a client role.

  1. createAndSendTextMessage : Text messages are the simplest of the JMS messages which takes string as a body of the message. This method creates 100 text messages and sets integer, float and String property of the message before sending it to ActiveMQ.
  2. createAndSendObjectMessage : Object message takes any java serializable object as message body. This method creates 100 object messages of type MyMessage and sets integer property of the message. Please note that the class MyMessage is serializable.
  3. createAndSendMapMessage : Map messages consist of key value pairs where key is generally String while value can be any of the java primitive types. This method creates 100 map messages of different types which includes float, String, Integer and Double. You can even set Byte, Long, Char, Boolean and Object values in Map message.
  4. createAndSendStreamMessage : A stream message is a stream of java primitives that should be read sequentially. In this method, a stream message is created and a file is read and written to Stream message as bytes.
  5. createAndSendBytesMessage : Bytes messages are used to read a stream of uninterpreted bytes. Two different file types, namely pdf and jpg are read in this method and sent to ActiveMQ as messages.

package com.javanbeyond;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import javax.jms.*;

public class QueueMessageSender {

    ActiveMQConnectionFactory factory = null;
    Connection connection = null;
    Session session = null;
    Destination dest = null;
    MessageProducer producer = null;
    public static Boolean TRANSACTIONAL = false;

    private void init(String host, int port)
            throws JMSException {
        // Creating Factory for connection
        factory = new ActiveMQConnectionFactory(
                "tcp://" + host + ":" + port);
        connection = factory.createConnection();
        connection.start();
        // in this session, client would send acknowledgement to broker at
        // the
        // end of sending all the messages.
        session = connection.createSession(TRANSACTIONAL,
                Session.AUTO_ACKNOWLEDGE);
        // If the queue is not there, it will be created. if it is there, it
        // will be used.
        dest = new ActiveMQQueue("FirstQueue");
        producer = session.createProducer(dest);
        // This is test run, so messages may be lost if ActiveMQ is
        // restarted
        // with messages in it.
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

    }

    public static void main(String[] argsthrows JMSException {
        QueueMessageSender queueMessageSender = new QueueMessageSender();
        // Prepares connections and creates a queue, if non exists.
        queueMessageSender.init(args[0], Integer.parseInt(args[1]));
        // Create a Text Message
         queueMessageSender.createAndSendTextMessage();
        // Create an Object Message
         queueMessageSender.createAndSendObjectMessage();
        // Create a Map Message
         queueMessageSender.createAndSendMapMessage();
        // Create a Stream Message
         queueMessageSender.createAndSendStreamMessage();
        // Create a Bytes Message
         queueMessageSender.createAndSendBytesMessage();
        System.out.println("All Messages sent.");
        queueMessageSender.producer.close();
        queueMessageSender.session.close();
        queueMessageSender.connection.close();

    }

    // Create 100 text messages with integer, float and string message headers
    // and send it to Queues
    public void createAndSendTextMessage() throws JMSException {
        for (int i = 1; i <= 100; i++) {
            // Adding text to the body
            TextMessage msg = session.createTextMessage("Message " + i);
            msg.setJMSType("TEXT");
            // Setting integer property
            msg.setIntProperty("id", i);
            // Setting float property
            msg.setFloatProperty("idSqrt"(floatMath.sqrt(i));
            // Every even number
            if (i % == 0) {
                msg.setStringProperty("for " + i, "john");
            else {
                msg.setStringProperty("for " + i, "doe");
            }
            producer.send(msg);
        }
        System.out.println("Text Messages sent.");
    }

    // Create 100 object messages with stock price, name and company name in it.
    public void createAndSendObjectMessage() throws JMSException {
        for (int i = 1; i <= 100; i++) {
            MyMessage mymessage = new MyMessage();
            mymessage.setStockPrice(i);
            mymessage.setStockName("Stock " + i);
            mymessage.setStockCompany("Company " + i);
            // Adding Object to the body
            ObjectMessage msg = session.createObjectMessage(mymessage);
            msg.setJMSType("OBJECT");
            // Adding an int property
            msg.setIntProperty("idSquare"(intMath.pow(i, 2));
            producer.send(msg);
        }
        System.out.println("Object Messages sent.");
    }

    // Create 100 Map messages
    public void createAndSendMapMessage() throws JMSException {
        for (int i = 1; i <= 100; i++) {
            MapMessage mapMsg = session.createMapMessage();
            // Set Map Values with Keys
            mapMsg.setJMSType("MAP");
            mapMsg.setInt("MessageNum", i);
            mapMsg.setDouble("Log", Math.log10(i));
            mapMsg.setDouble("Square Root", Math.sqrt(i));
            mapMsg.setString("Cube Root", String.valueOf(Math.cbrt(i)));
            mapMsg.setString("Exponential", String.valueOf(Math.exp(i)));
            mapMsg.setInt("Square"(intMath.pow(i, 2));
            mapMsg.setInt("Cube"(intMath.pow(i, 3));
            mapMsg.setFloat("tan\u03B8"(floatMath.tan(i));
            mapMsg.setFloat("sin\u03B8"(floatMath.sin(i));
            mapMsg.setFloat("cos\u03B8"(floatMath.cos(i));
            producer.send(mapMsg);
        }
        System.out.println("Map Messages sent.");

    }

    public void createAndSendStreamMessage() throws JMSException {
        StreamMessage streamMsg = session.createStreamMessage();
        streamMsg.setJMSType("STREAM");
        // Setting integer property
        streamMsg.setIntProperty("id"1);
        // Setting object
        streamMsg.writeObject(new Integer(2));
        // Setting String. Please note that the stream message must be read
        // at consumer side with the same order as it is written in producer
        // side.
        streamMsg.writeString("Stream String");
        File streamFile = new File("src/main/resources/ForStreamMessage.txt");
        streamMsg.setStringProperty("fileNameExtension", streamFile.getName());
        streamMsg.setLongProperty("fileSize", streamFile.length());
        try {
            streamMsg.writeBytes(Files.readAllBytes(streamFile.toPath()));
        catch (IOException e) {
            e.printStackTrace();
        }
        producer.send(streamMsg);
        System.out.println("Stream Messages sent.");
    }

    // Reads a file and creates a byte message from its content.

    public void createAndSendBytesMessage() throws JMSException {
        BytesMessage bytesMsgPdf = session.createBytesMessage();
        bytesMsgPdf.setJMSType("BYTES");
        // Create message from pdf file
        try {
            bytesMsgPdf.writeBytes(Files.readAllBytes(
                    new File("src/main/resources/pdf-sample.pdf").toPath()));
            bytesMsgPdf.setStringProperty("fileExtension""pdf");
            bytesMsgPdf.setStringProperty("fileName""SampleFileName");
        catch (IOException e) {
            e.printStackTrace();
        }
        producer.send(bytesMsgPdf);
        // Create message from jpg file
        BytesMessage bytesMsgJpg = session.createBytesMessage();
        bytesMsgJpg.setJMSType("BYTES");
        try {
            bytesMsgJpg.writeBytes(Files.readAllBytes(
                    new File("src/main/resources/Sample.jpg").toPath()));
            bytesMsgJpg.setStringProperty("fileExtension""jpg");
            bytesMsgJpg.setStringProperty("fileName""SampleJpg");
        catch (IOException e) {
            e.printStackTrace();
        }
        producer.send(bytesMsgJpg);
        System.out.println("Bytes Message sent.");
    }
}

Example Execution:

  1. Start ActiveMQ and run the client program by executing “maven clean and install”.
  2. Once it finishes, open your favorite browser and type “http://localhost:8161/admin”
  3. Provide username as “admin” and password as “admin”, if asked for.
  4. Click on Queue Tab and you should see a Queue named “FirstQueue” created with 303 messages in it. Click on “FirstQueue” queue and you will see all the different types of messages with their JMS message ids and types.

Back to Top