Receiving Messages from ActiveMQ Queues

In this tutorial we are going to learn how to browse through and receive messages from ActiveMQ queues.

Example Scenario:

In the post “Sending Messages To ActiveMQ“, we sent few messages to ActiveMQ. In this tutorial, we are going to browse those messages and afterwards, consume them from ActiveMQ.


Project artifacts:

1. Technologies used –

a. Eclipse IDE for Java EE developer 4.5.0.20150621-1200 (Mars)
b. Preinstalled Maven with Eclipse Mars
c. Java 1.8
d. ActiveMQ 5.13.0

Jars used – 

In case you are not using maven, please find below the list of jars used in this java application:

activemq-client-5.13.0.jar
slf4j-api-1.7.13.jar
geronimo-jms_1.1_spec-1.1.1.jar
hawtbuf-1.11.jar
geronimo-j2ee-management_1.1_spec-1.0.1.jar

2. Eclipse project folder structure:

activemqreceivemessageseclipseproject

 

3. A simple Maven Project

Create a simple maven project skipping archetypes.

Simple Maven Project

4. Maven Dependencies

file: pom.xml. Maven unit of work to declare project dependencies, compile code and call main method of java JMS client. It has Active MQ client dependency and plugin to execute our class.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.javanbeyond</groupId>
    <artifactId>QueueMessageReceiver</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-client</artifactId>
            <version>5.13.0</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>exec-maven-plugin</artifactId>
                <version>1.1.1</version>
                <executions>
                    <execution>
                        <phase>test</phase>
                        <goals>
                            <goal>java</goal>
                        </goals>
                        <configuration>
                            <mainClass>com.javanbeyond.QueueMessageReceiver</mainClass>
                            <arguments>
                                <argument>localhost</argument>
                                <argument>61616</argument>
                            </arguments>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

 

5. POJO

file: MyMessage.java. A Serializable POJO class which object messages will be deserialized to after reading from queues.

package com.javanbeyond;

import java.io.Serializable;

public class MyMessage implements Serializable {
    /**
     
     */
    private static final long serialVersionUID = 1L;
    int stockPrice;
    String stockName;
    String stockCompany;

    public int getStockPrice() {
        return stockPrice;
    }

    public void setStockPrice(int stockPrice) {
        this.stockPrice = stockPrice;
    }

    public String getStockName() {
        return stockName;
    }

    public void setStockName(String stockName) {
        this.stockName = stockName;
    }

    public String getStockCompany() {
        return stockCompany;
    }

    public void setStockCompany(String stockCompany) {
        this.stockCompany = stockCompany;
    }
    public String toString() {
        return stockName + " ; " + stockCompany + " ; " + stockPrice;
    }

}

 

6. ActiveMQ Message Consumer

file: QueueMessageReceiver.java. This is the standalone Java client program used to browse and receive messages from Queues. The method browseMessages just goes over all the messages on the queues without consuming them. It either prints the messages or download and save in local directory as files. Browse operation doesn’t remove messages from queues. The method readMessages is the method which consumes the messages from the queue “FirstQueue”, if it has any and keep listening to it for 10 seconds before the program stops. Please take cognizance of how bytes of Stream and Bytes messages are written to local directory ReceivedFiles as files. The method readAndPrintMessage segregates the message based on type using instanceof operator.

package com.javanbeyond;

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.Enumeration;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;

public class QueueMessageReceiver {
    public static Boolean TRANSACTIONAL = false;
    ActiveMQConnectionFactory factory = null;
    Connection connection = null;
    Session session = null;
    Queue queue = null;
    MessageConsumer consumer = null;

    private void init(String host, int portthrows JMSException {
        // Creating Factory for connection
        factory = new ActiveMQConnectionFactory("tcp://" + host + ":" + port);
        // This is to surpass security where you tell ActiveMQ that this package 
        // classes will be exchanged as ObjectMessages.
        System.setProperty("org.apache.activemq.SERIALIZABLE_PACKAGES",
                "com.javanbeyond");
        connection = factory.createConnection();
        connection.start();
        // In this session, client would send acknowledgement to broker at the
        // end of sending all the messages.
        session = connection.createSession(TRANSACTIONAL,
                Session.AUTO_ACKNOWLEDGE);
        // If the queue is not there, it will be created. If it is there, it
        // will be used.
        queue = new ActiveMQQueue("FirstQueue");
    }

    public static void main(String[] argsthrows NumberFormatException, JMSException {
        QueueMessageReceiver queueMessageReceiver = new QueueMessageReceiver();
        // Creating connection and initializing before getting messages from
        // queues.
        queueMessageReceiver.init(args[0], Integer.parseInt(args[1]));
        // This method with only browse the messages without it being
        // removed
        // from queue
        System.out
                .println("BROWSING STARTS ===================================");
        queueMessageReceiver.browseMessages();
        System.out.println("BROWSING ENDS ===================================");
        // This method will actually read the messages and remove them from
        // queue
        System.out
                .println("READING STARTS ===================================");
        queueMessageReceiver.readMessages();
        System.out.println("READING ENDS ===================================");
        System.out.println("All Messages received.");
        queueMessageReceiver.consumer.close();
        queueMessageReceiver.session.close();
        queueMessageReceiver.connection.close();

    }

    public void readMessages() throws JMSException {
        consumer = session.createConsumer(queue);
        // It will wait for 10 seconds before all the messages are read.
        ExecutorService executor = Executors.newSingleThreadExecutor();
        try {
            executor.invokeAll(Arrays.asList(new Callable<String>() {
                public String call() {
                    int count = 0;
                    while (true) {
                        Message msg;
                        try {
                            msg = consumer.receive();
                            System.out.println(
                                    "Reading Message No. " (++count));
                            readAndPrintMessage(msg);
                        catch (JMSException e) {
                            break;
                        }
                    }
                    return null;
                }
            })10, TimeUnit.SECONDS);
        catch (InterruptedException e) {
            e.printStackTrace();
        // Timeout of 10 seconds.
        executor.shutdown();
    }

    // Browse through every messages
    public void browseMessages() throws JMSException {
        QueueBrowser browser = session.createBrowser(queue);
        Enumeration<?> enumeration = browser.getEnumeration();
        int count = 0;
        while (enumeration.hasMoreElements()) {
            System.out.println("Browsing Message No. " (++count));
            Object obj = enumeration.nextElement();
            readAndPrintMessage(obj);
        }
    }

    void readAndPrintMessage(Object objthrows JMSException {
        if (obj instanceof TextMessage) {
            TextMessage text = (TextMessageobj;
            System.out.println("Starting text msg: " + text.getText());
            readAndPrintAllProperties(text);
        else if (obj instanceof ObjectMessage) {
            ObjectMessage objmsg = (ObjectMessageobj;
            System.out.println("Starting object msg: ");
            System.out.println("Reading integer property from ObjectMessage "
                    + objmsg.getIntProperty("idSquare"));
            Object object = objmsg.getObject();
            MyMessage myMessage = (MyMessageobject;
            System.out.println(myMessage.toString());
        else if (obj instanceof MapMessage) {
            MapMessage mapmsg = (MapMessageobj;
            System.out.println("Starting map msg: ");
            Enumeration<String> mapNames = mapmsg.getMapNames();
            while (mapNames.hasMoreElements()) {
                String mapName = mapNames.nextElement();
                System.out.println("Map Name:" + mapName + "; Map Value:"
                        + mapmsg.getObject(mapName).toString());
            }
        else if (obj instanceof StreamMessage) {
            StreamMessage strmmsg = (StreamMessageobj;
            // The sequence of reading messages should match the sequence of
            // sending them
            System.out.println("Starting stream msg: ");
            System.out.println(
                    "Stream Message reading object : " + strmmsg.readObject());
            System.out.println(
                    "Stream Message reading String : " + strmmsg.readString());
            StringBuffer sb = new StringBuffer();
            System.out.println("Stream Message reading bytes : ");
            byte[] buffer = new byte[4096];
            int c;
            while ((c = strmmsg.readBytes(buffer)) 0) {
                sb.append(new String(buffer, 0, c));
            }
            // Print first 1000 characters of the message due to its hugeness
            System.out.println(sb.toString().substring(01000" ...");
            String fileNameExtension = strmmsg
                    .getStringProperty("fileNameExtension");
            // If file name is not provided, the JMS id will be used as file
            // name
            fileNameExtension = fileNameExtension == null
                    ? strmmsg.getJMSMessageID() ".txt" : fileNameExtension;
            File file = new File("ReceivedFiles/" + fileNameExtension);
            try {
                Files.write(file.toPath(), sb.toString().getBytes(),
                        StandardOpenOption.CREATE);
            catch (IOException e) {
                e.printStackTrace();
            }
            readAndPrintAllProperties(strmmsg);
        else if (obj instanceof BytesMessage) {
            BytesMessage bytesmsg = (BytesMessageobj;
            System.out.println("Starting Bytes msg: ");
            String fileName = bytesmsg.getStringProperty("fileName");
            // If file name is not provided, the JMS id will be used as file
            // name
            File file = new File("ReceivedFiles/"
                    (fileName == null || fileName.isEmpty()
                            ? bytesmsg.getJMSMessageID() : fileName)
                    "." + bytesmsg.getStringProperty("fileExtension"));
            System.out.println("Writing file " + file.getName() " to "
                    + file.getAbsolutePath());
            OutputStream os = null;
            try {
                os = new FileOutputStream(file, true);
            catch (FileNotFoundException e) {
                e.printStackTrace();
            }
            BufferedOutputStream bos = new BufferedOutputStream(os);
            byte[] buffer = new byte[4096];
            int c;
            try {
                while ((c = bytesmsg.readBytes(buffer)) 0) {
                    bos.write(buffer, 0, c);
                }
                bos.flush();
                bos.close();
            catch (IOException e1) {
                e1.printStackTrace();
            }
        else {
            System.out.println("Unrecognized Message type " + obj.getClass());
        }
    }

    // Printing all the properties of messages
    void readAndPrintAllProperties(Message msg) {
        Enumeration<?> propEnum = null;
        try {
            propEnum = msg.getPropertyNames();
        catch (JMSException e) {
            e.printStackTrace();
        }
        while (propEnum.hasMoreElements()) {
            String propName = propEnum.nextElement().toString();
            try {
                System.out.println(
                        "Property Name : " + propName + "; Property Value : "
                                + msg.getObjectProperty(propName).toString());
            catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

}

Example Execution:

  1. Start ActiveMQ and run the program created here. The queue should have some messages now.

activemqreadingmessagestoberead

 

2. Now, run this program and on eclipse console you should see messages being browsed first and then read.

3. The browsing operation doesn’t change the state of the queues. It just reads the content of the queues and display. On the other hand, consume operation actually moves the messages from ActiveMQ to client program. The queue count should be “0” in ActiveMQ now and the program will wait for 10 seconds before it completes.


4. You should see the read messages converted to files in your ReceivedFiles directory of eclipse project.

activemqmessagesfilesdownloaded

 

Leave a Reply

Back to Top
%d bloggers like this: