Sending Messages To ActiveMQ Queues

In this tutorial, we will learn how to send different types of messages to ActiveMQ queues.

Example Scenario:

We need to read java String and different types of files and convert them to JMS messages and send to ActiveMQ queues to be received by some other application.


Project artifacts:

 1. Technologies used –

a. Eclipse IDE for Java EE developer 4.5.0.20150621-1200 (Mars)
b. Preinstalled Maven with Eclipse Mars
c. Java 1.8
d. ActiveMQ 5.13.0

Jars used – 

In case you are not using maven, please find below the list of jars used in this java application:

activemq-client-5.13.0.jar
slf4j-api-1.7.13.jar
geronimo-jms_1.1_spec-1.1.1.jar
hawtbuf-1.11.jar
geronimo-j2ee-management_1.1_spec-1.0.1.jar

2. Eclipse project folder structure:

3. A simple Maven Project

Create a simple maven project skipping archetypes.

Simple Maven Project

4. Maven Dependencies

file: pom.xml. Maven unit of work to declare project dependencies, compile code and call main method of java JMS client. It has Active MQ client dependency and plugin to execute our class.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.javanbeyond</groupId>
    <artifactId>QueueMessageSender</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-client</artifactId>
            <version>5.13.0</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>exec-maven-plugin</artifactId>
                <version>1.1.1</version>
                <executions>
                    <execution>
                        <phase>test</phase>
                        <goals>
                            <goal>java</goal>
                        </goals>
                        <configuration>
                            <mainClass>com.javanbeyond.QueueMessageSender</mainClass>
                            <arguments>
                                <argument>localhost</argument>
                                <argument>61616</argument>
                            </arguments>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

 

5. POJO

file: MyMessage.java. A Serializable POJO class whose objects will be sent to Queues.

package com.javanbeyond;

import java.io.Serializable;

public class MyMessage implements Serializable {
    /**
     
     */
    private static final long serialVersionUID = 1L;
    int stockPrice;
    String stockName;
    String stockCompany;

    public int getStockPrice() {
        return stockPrice;
    }

    public void setStockPrice(int stockPrice) {
        this.stockPrice = stockPrice;
    }

    public String getStockName() {
        return stockName;
    }

    public void setStockName(String stockName) {
        this.stockName = stockName;
    }

    public String getStockCompany() {
        return stockCompany;
    }

    public void setStockCompany(String stockCompany) {
        this.stockCompany = stockCompany;
    }

    public String toString() {
        return stockName + " ; " + stockCompany + " ; " + stockPrice;
    }

}

 

6. ActiveMQ Message Producer

file:QueueMessageSender.java. This is the java client program for sending messages to Queues. JMS supports 5 different types of messages, namely TextMessages, MapMessages, BytesMessages, StreamMessages and ObjectMessages. The below client depicts all of the messages in action in a client role.

  1. createAndSendTextMessage : Text messages are the simplest of the JMS messages which takes string as a body of the message. This method creates 100 text messages and sets integer, float and String property of the message before sending it to ActiveMQ.
  2. createAndSendObjectMessage : Object message takes any java serializable object as message body. This method creates 100 object messages of type MyMessage and sets integer property of the message. Please note that the class MyMessage is serializable.
  3. createAndSendMapMessage : Map messages consist of key value pairs where key is generally String while value can be any of the java primitive types. This method creates 100 map messages of different types which includes float, String, Integer and Double. You can even set Byte, Long, Char, Boolean and Object values in Map message.
  4. createAndSendStreamMessage : A stream message is a stream of java primitives that should be read sequentially. In this method, a stream message is created and a file is read and written to Stream message as bytes.
  5. createAndSendBytesMessage : Bytes messages are used to read a stream of uninterpreted bytes. Two different file types, namely pdf and jpg are read in this method and sent to ActiveMQ as messages.

package com.javanbeyond;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import javax.jms.*;

public class QueueMessageSender {

    ActiveMQConnectionFactory factory = null;
    Connection connection = null;
    Session session = null;
    Destination dest = null;
    MessageProducer producer = null;
    public static Boolean TRANSACTIONAL = false;

    private void init(String host, int port)
            throws JMSException {
        // Creating Factory for connection
        factory = new ActiveMQConnectionFactory(
                "tcp://" + host + ":" + port);
        connection = factory.createConnection();
        connection.start();
        // in this session, client would send acknowledgement to broker at
        // the
        // end of sending all the messages.
        session = connection.createSession(TRANSACTIONAL,
                Session.AUTO_ACKNOWLEDGE);
        // If the queue is not there, it will be created. if it is there, it
        // will be used.
        dest = new ActiveMQQueue("FirstQueue");
        producer = session.createProducer(dest);
        // This is test run, so messages may be lost if ActiveMQ is
        // restarted
        // with messages in it.
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

    }

    public static void main(String[] argsthrows JMSException {
        QueueMessageSender queueMessageSender = new QueueMessageSender();
        // Prepares connections and creates a queue, if non exists.
        queueMessageSender.init(args[0], Integer.parseInt(args[1]));
        // Create a Text Message
         queueMessageSender.createAndSendTextMessage();
        // Create an Object Message
         queueMessageSender.createAndSendObjectMessage();
        // Create a Map Message
         queueMessageSender.createAndSendMapMessage();
        // Create a Stream Message
         queueMessageSender.createAndSendStreamMessage();
        // Create a Bytes Message
         queueMessageSender.createAndSendBytesMessage();
        System.out.println("All Messages sent.");
        queueMessageSender.producer.close();
        queueMessageSender.session.close();
        queueMessageSender.connection.close();

    }

    // Create 100 text messages with integer, float and string message headers
    // and send it to Queues
    public void createAndSendTextMessage() throws JMSException {
        for (int i = 1; i <= 100; i++) {
            // Adding text to the body
            TextMessage msg = session.createTextMessage("Message " + i);
            msg.setJMSType("TEXT");
            // Setting integer property
            msg.setIntProperty("id", i);
            // Setting float property
            msg.setFloatProperty("idSqrt"(floatMath.sqrt(i));
            // Every even number
            if (i % == 0) {
                msg.setStringProperty("for " + i, "john");
            else {
                msg.setStringProperty("for " + i, "doe");
            }
            producer.send(msg);
        }
        System.out.println("Text Messages sent.");
    }

    // Create 100 object messages with stock price, name and company name in it.
    public void createAndSendObjectMessage() throws JMSException {
        for (int i = 1; i <= 100; i++) {
            MyMessage mymessage = new MyMessage();
            mymessage.setStockPrice(i);
            mymessage.setStockName("Stock " + i);
            mymessage.setStockCompany("Company " + i);
            // Adding Object to the body
            ObjectMessage msg = session.createObjectMessage(mymessage);
            msg.setJMSType("OBJECT");
            // Adding an int property
            msg.setIntProperty("idSquare"(intMath.pow(i, 2));
            producer.send(msg);
        }
        System.out.println("Object Messages sent.");
    }

    // Create 100 Map messages
    public void createAndSendMapMessage() throws JMSException {
        for (int i = 1; i <= 100; i++) {
            MapMessage mapMsg = session.createMapMessage();
            // Set Map Values with Keys
            mapMsg.setJMSType("MAP");
            mapMsg.setInt("MessageNum", i);
            mapMsg.setDouble("Log", Math.log10(i));
            mapMsg.setDouble("Square Root", Math.sqrt(i));
            mapMsg.setString("Cube Root", String.valueOf(Math.cbrt(i)));
            mapMsg.setString("Exponential", String.valueOf(Math.exp(i)));
            mapMsg.setInt("Square"(intMath.pow(i, 2));
            mapMsg.setInt("Cube"(intMath.pow(i, 3));
            mapMsg.setFloat("tan\u03B8"(floatMath.tan(i));
            mapMsg.setFloat("sin\u03B8"(floatMath.sin(i));
            mapMsg.setFloat("cos\u03B8"(floatMath.cos(i));
            producer.send(mapMsg);
        }
        System.out.println("Map Messages sent.");

    }

    public void createAndSendStreamMessage() throws JMSException {
        StreamMessage streamMsg = session.createStreamMessage();
        streamMsg.setJMSType("STREAM");
        // Setting integer property
        streamMsg.setIntProperty("id"1);
        // Setting object
        streamMsg.writeObject(new Integer(2));
        // Setting String. Please note that the stream message must be read
        // at consumer side with the same order as it is written in producer
        // side.
        streamMsg.writeString("Stream String");
        File streamFile = new File("src/main/resources/ForStreamMessage.txt");
        streamMsg.setStringProperty("fileNameExtension", streamFile.getName());
        streamMsg.setLongProperty("fileSize", streamFile.length());
        try {
            streamMsg.writeBytes(Files.readAllBytes(streamFile.toPath()));
        catch (IOException e) {
            e.printStackTrace();
        }
        producer.send(streamMsg);
        System.out.println("Stream Messages sent.");
    }

    // Reads a file and creates a byte message from its content.

    public void createAndSendBytesMessage() throws JMSException {
        BytesMessage bytesMsgPdf = session.createBytesMessage();
        bytesMsgPdf.setJMSType("BYTES");
        // Create message from pdf file
        try {
            bytesMsgPdf.writeBytes(Files.readAllBytes(
                    new File("src/main/resources/pdf-sample.pdf").toPath()));
            bytesMsgPdf.setStringProperty("fileExtension""pdf");
            bytesMsgPdf.setStringProperty("fileName""SampleFileName");
        catch (IOException e) {
            e.printStackTrace();
        }
        producer.send(bytesMsgPdf);
        // Create message from jpg file
        BytesMessage bytesMsgJpg = session.createBytesMessage();
        bytesMsgJpg.setJMSType("BYTES");
        try {
            bytesMsgJpg.writeBytes(Files.readAllBytes(
                    new File("src/main/resources/Sample.jpg").toPath()));
            bytesMsgJpg.setStringProperty("fileExtension""jpg");
            bytesMsgJpg.setStringProperty("fileName""SampleJpg");
        catch (IOException e) {
            e.printStackTrace();
        }
        producer.send(bytesMsgJpg);
        System.out.println("Bytes Message sent.");
    }
}

Example Execution:

  1. Start ActiveMQ and run the client program by executing “maven clean and install”.
  2. Once it finishes, open your favorite browser and type “http://localhost:8161/admin”
  3. Provide username as “admin” and password as “admin”, if asked for.
  4. Click on Queue Tab and you should see a Queue named “FirstQueue” created with 303 messages in it. Click on “FirstQueue” queue and you will see all the different types of messages with their JMS message ids and types.

Leave a Reply

Back to Top
%d bloggers like this: