Create Durable Topic Subscriber for ActiveMQ

In this tutorial, we are going to learn how to receive messages from ActiveMQ as a durable message subscriber of Topic.

Example Scenario:

We need to get different types of messages from ActiveMQ topics using a standalone java application as durable message subscriber for Topics.


Project artifacts:

1. Technologies used –

a. Eclipse IDE for Java EE developer 4.5.0.20150621-1200 (Mars)
b. Preinstalled Maven with Eclipse Mars
c. Java 1.8
d. ActiveMQ 5.12.0

2. Eclipse project folder structure:

DurableTopicSubscriberProj

3. A simple Maven Project

Create a simple maven project skipping archetypes.

Simple Maven Project

4. Maven Dependencies

Paste the below content to pom.xml. It has Active MQ client dependency and plugin to execute our class (call main method).

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.javanbeyond</groupId>
	<artifactId>ActivemqTopicClientDurable</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<dependencies>
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-client</artifactId>
			<version>5.12.0</version>
		</dependency>
	</dependencies>
	<build>
		<plugins>
			<plugin>
				<groupId>org.codehaus.mojo</groupId>
				<artifactId>exec-maven-plugin</artifactId>
				<version>1.1.1</version>
				<executions>
					<execution>
						<phase>test</phase>
						<goals>
							<goal>java</goal>
						</goals>
						<configuration>
							<mainClass>com.javanbeyond.TopicSubscriber</mainClass>
							<arguments>
								<argument>localhost</argument>
								<argument>61616</argument>
							</arguments>
						</configuration>
					</execution>
				</executions>
			</plugin>
		</plugins>
	</build>
</project>

5. POJO

Serializable class object to be received from Topics.

file:MyMessage.java

package com.javanbeyond;

import java.io.Serializable;

public class MyMessage implements Serializable {
  /**
   
   */
  private static final long serialVersionUID = 1L;
  int stockPrice;
  String stockName;
  String stockCompany;

  public int getStockPrice() {
    return stockPrice;
  }

  public void setStockPrice(int stockPrice) {
    this.stockPrice = stockPrice;
  }

  public String getStockName() {
    return stockName;
  }

  public void setStockName(String stockName) {
    this.stockName = stockName;
  }

  public String getStockCompany() {
    return stockCompany;
  }

  public void setStockCompany(String stockCompany) {
    this.stockCompany = stockCompany;
  }

  public String toString() {
    return stockName + " ; " + stockCompany + " ; " + stockPrice;
  }

}

6. Subscriber class

This class creates three subscribers to the topic with different subscription name.
In other words, we have three durable subscribers for the topic.

file:TopicSubscriber.java

package com.javanbeyond;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

public class TopicSubscriber implements MessageListener {
  public static Boolean TRANSACTIONAL = false;
  public static String TOPIC_NAME = "firstTopic";

  public static void main(String[] argsthrows JMSException {
    String host = args[0];
    String port = args[1];
    // Creating Factory for connection
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
        "tcp://" + host + ":" + port);
    Connection connection = factory.createConnection();
    // Setting unique client id for durable subscriber
    connection.setClientID("MyListener");
    connection.start();
    // Create 3 new consumers for the topic
    for (int i = 1; i <= 3; i++) {
      Session session = connection.createSession(TRANSACTIONAL,
          Session.AUTO_ACKNOWLEDGE);
      Topic destination = session.createTopic(TOPIC_NAME);
      MessageConsumer consumer = session
          .createDurableSubscriber(destination, "Listener" + i);
      // Setting the listener class whose onMessage method will be called
      // with message as argument
      consumer.setMessageListener(new TopicSubscriber());
    }
  }

  public void onMessage(Message msg) {
    try {

      if (msg instanceof TextMessage) {
        TextMessage text = (TextMessagemsg;
        System.out.println(" - Consuming text msg: " + text.getText());
      else if (msg instanceof ObjectMessage) {
        ObjectMessage objmsg = (ObjectMessagemsg;
        Object obj = objmsg.getObject();
        System.out.println(" - Consuming object msg: " + obj);
      else {
        System.out.println(
            " - Unrecognized Message type " + msg.getClass());
      }
    catch (JMSException e) {
      e.printStackTrace();
    }
  }
}

Example Execution:

  1. Start ActiveMQ and  and build the project with “maven clean and install”. All three listeners starts listening to the topic “firstTopic”.
  2. Login to ActiveMQ admin (“http://localhost:8161/admin/”) by typing user id /password as admin/admin. Click on “Topic” tab.

DurableSubscriptionActiveMQ

3. Click on “firstTopic” and fill in any Text in the body. Click on “Send”.

DurableSubscriberMessage

4. Your eclipse console should show the message three times as there are three subscribers and each got its message.

 - Consuming text msg: Hi, This message is for Durable Subscribers.
 - Consuming text msg: Hi, This message is for Durable Subscribers.
 - Consuming text msg: Hi, This message is for Durable Subscribers.

 

Back to Top