Create non-durable Topic Subscriber for ActiveMQ

In this tutorial, we are going to learn how to consume messages from ActiveMQ topic as a non-durable message subscriber.

Example Scenario:

We need to get different types of messages from ActiveMQ topics using a standalone java application as non-durable message subscriber.


Project artifacts:

1. Technologies used –

a. Eclipse IDE for Java EE developer 4.5.0.20150621-1200 (Mars)
b. Preinstalled Maven with Eclipse Mars
c. Java 1.8
d. ActiveMQ 5.12.0

2. Eclipse project folder structure:

NonDurable ActiveMQ subscriber

3. A simple Maven Project

Create a simple maven project skipping archetypes.

Simple Maven Project

4. Maven Dependencies

Paste the below content to pom.xml. It has Active MQ client dependency and plugin to execute our class (call main method).

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.javanbeyond</groupId>
	<artifactId>ActivemqTopicClientNonDurable</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<dependencies>
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-client</artifactId>
			<version>5.12.0</version>
		</dependency>
	</dependencies>
	<build>
		<plugins>
			<plugin>
				<groupId>org.codehaus.mojo</groupId>
				<artifactId>exec-maven-plugin</artifactId>
				<version>1.1.1</version>
				<executions>
					<execution>
						<phase>test</phase>
						<goals>
							<goal>java</goal>
						</goals>
						<configuration>
							<mainClass>com.javanbeyond.TopicListenerInitiator</mainClass>
							<arguments>
								<argument>localhost</argument>
								<argument>61616</argument>
							</arguments>
						</configuration>
					</execution>
				</executions>
			</plugin>
		</plugins>
	</build>
</project>

5. POJO

Serializable class object to be received from Topics.

file:MyMessage.java

package com.javanbeyond;

import java.io.Serializable;

public class MyMessage implements Serializable {
  /**
   
   */
  private static final long serialVersionUID = 1L;
  int stockPrice;
  String stockName;
  String stockCompany;

  public int getStockPrice() {
    return stockPrice;
  }

  public void setStockPrice(int stockPrice) {
    this.stockPrice = stockPrice;
  }

  public String getStockName() {
    return stockName;
  }

  public void setStockName(String stockName) {
    this.stockName = stockName;
  }

  public String getStockCompany() {
    return stockCompany;
  }

  public void setStockCompany(String stockCompany) {
    this.stockCompany = stockCompany;
  }

  public String toString() {
    return stockName + " ; " + stockCompany + " ; " + stockPrice;
  }

}

6. Main method class

This class creates three different threads for each listener to get messages from the same topic “firstTopic”.
In other words, we have three subscribers for the topic.

file:TopicListenerInitiator.java

package com.javanbeyond;

import javax.jms.Connection;
import javax.jms.JMSException;

import org.apache.activemq.ActiveMQConnectionFactory;

public class TopicListenerInitiator {

  public static void main(String[] argsthrows JMSException {
    String host = args[0];
    String port = args[1];
    // Creating Factory for connection
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
        "tcp://" + host + ":" + port);
    Connection connection = factory.createConnection();
    connection.start();
    // Create 3 new threads as consumers for the topic and start listening
    for (int i = 1; i <= 3; i++) {
      Thread listener = new Thread(
          new TopicListener(connection, "Listener" + i));
      listener.start();
    }
  }
}

7. The actual listener class

This is the class which listens or subscribes to the topic and wait for any messages to arrive at the topic. Each subscriber creates its own session and consumer, though connection remains same and being shared by them.

file:TopicListener.java

package com.javanbeyond;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;

// Listener class whose objects listen to the topic
public class TopicListener implements Runnable {
  public static Boolean TRANSACTIONAL = false;
  public static String TOPIC_NAME = "firstTopic";

  Connection connection = null;
  String listener = null;

  public TopicListener(Connection connection, String listener) {
    this.connection = connection;
    this.listener = listener;
  }

  public void run() {
    try {
      // in this session, client would send acknowledgement to broker at
      // the
      // end of sending all the messages.
      Session session = this.connection.createSession(TRANSACTIONAL,
          Session.AUTO_ACKNOWLEDGE);
      Destination destination = session.createTopic(TOPIC_NAME);
      MessageConsumer consumer = session.createConsumer(destination);
      // Its an infinite loop. Read till all the messages are consumed.
      System.out.println(this.listener + " listening...");
      while (true) {
        Message msg = consumer.receive();
        if (msg instanceof TextMessage) {
          TextMessage text = (TextMessagemsg;
          System.out.println(this.listener + " - Consuming text msg: "
              + text.getText());
        else if (msg instanceof ObjectMessage) {
          ObjectMessage objmsg = (ObjectMessagemsg;
          Object obj = objmsg.getObject();
          System.out.println(
              this.listener + " - Consuming object msg: " + obj);
        else {
          System.out.println(this.listener
              " - Unrecognized Message type " + msg.getClass());
        }

      }
    catch (JMSException e) {
      e.printStackTrace();
    }

  }
}

Example Execution:

  1. Start ActiveMQ and build the project with “maven clean and install”. All three listeners starts listening to the topic. The sequence of print in your case may be different from mine.
    1. Listener1 listening...
      Listener3 listening...
      Listener2 listening...
      

2.  Login to ActiveMQ admin (“http://locahost:8161/admin/”) by typing user id /password as admin/admin. Click on “Topic” tab.

ActiveMQ Topic subscriber

3. Click on “Send To ” at the right most column of topic “firstTopic” or click on the topic “firstTopic”.

4. Fill in the words “Hello World !!!” or anything of your choice.

ActiveMQAdminConsoleMessages

5. The moment you press “Send”, you should see the messages being consumed by the three listeners on your Eclipse console.

Listener1 - Consuming text msg: Hello World !!!
Listener3 - Consuming text msg: Hello World !!!
Listener2 - Consuming text msg: Hello World !!!

 

 

Back to Top