XenQTT Documentation

Running Applications

XenQTT is released as a single jar. To run any XenQTT application you execute the jar:

java -jar xenqtt-version.jar
						
Where version is the XenQTT version number which is part of the JAR file name. The command above will give you usage details. For much more detailed info execute this:
java -jar xenqtt-version.jar help
						

Maven Integration

XenQTT is released to the central maven repository. Add this dependency element with the latest version number to your pom.xml:

<dependency>
    <groupId>net.sf.xenqtt</groupId>
    <artifactId>xenqtt</artifactId>
    <version>...</version>
</dependency>	

Using the Client API

Overview

XenQTT provides two distinct MQTT clients for interfacing with MQTT brokers. One is a synchronous client where all operations to the broker (connect, publish, subscribe, etc.) are blocking. The other is an asynchronous client where operations to the broker are non-blocking. Interaction with broker events in the asynchronous client are handled via callbacks in a specific interface that the user must implement.

The following sections show how to use both the synchronous and asynchronous clients. You are taken through a subscriber and publisher model where the subscriber attempts to build up a musical catalog of classic rock with a paired publisher that is only too happy to provide.

All examples are included in the xenqtt jar in the net.sf.xenqtt.examples package.

Synchronous Client Example

Synchronous Subscriber

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import net.sf.xenqtt.client.MqttClient;
import net.sf.xenqtt.client.MqttClientListener;
import net.sf.xenqtt.client.PublishMessage;
import net.sf.xenqtt.client.Subscription;
import net.sf.xenqtt.client.SyncMqttClient;
import net.sf.xenqtt.message.ConnectReturnCode;
import net.sf.xenqtt.message.QoS;

import org.apache.log4j.Logger;

/**
 * Builds music catalogs from years gone by.
 */
public class MusicSubscriber {

	private static final Logger log = Logger.getLogger(MusicSubscriber.class);

	public static void main(String... args) throws Throwable {
		final List<String> catalog = Collections.synchronizedList(new ArrayList<String>());
		MqttClientListener listener = new MqttClientListener() {

			@Override
			public void publishReceived(MqttClient client, PublishMessage message) {
				catalog.add(message.getPayloadString());
				message.ack();
			}

			@Override
			public void disconnected(MqttClient client, Throwable cause, boolean reconnecting) {
				if (cause != null) {
					log.error("Disconnected from the broker due to an exception.", cause);
				} else {
					log.info("Disconnecting from the broker.");
				}

				if (reconnecting) {
					log.info("Attempting to reconnect to the broker.");
				}
			}

		};

		// Build your client. This client is a synchronous one so all interaction with the broker will block until said interaction completes.
		SyncMqttClient client = new SyncMqttClient("tcp://mqtt.broker:1883", listener, 5);
		try {
			// Connect to the broker with a specific client ID. Only if the broker accepted the connection shall we proceed.
			ConnectReturnCode returnCode = client.connect("musicLover", true);
			if (returnCode != ConnectReturnCode.ACCEPTED) {
				log.error("Unable to connect to the MQTT broker. Reason: " + returnCode);
				return;
			}

			// Create your subscriptions. In this case we want to build up a catalog of classic rock.
			List<Subscription> subscriptions = new ArrayList<Subscription>();
			subscriptions.add(new Subscription("grand/funk/railroad", QoS.AT_MOST_ONCE));
			subscriptions.add(new Subscription("jefferson/airplane", QoS.AT_MOST_ONCE));
			subscriptions.add(new Subscription("seventies/prog/#", QoS.AT_MOST_ONCE));
			client.subscribe(subscriptions);

			// Build up your catalog. After a while you've waited long enough so move on.
			try {
				Thread.sleep(30000);
			} catch (InterruptedException ignore) {
			}

			// Report on what we have found.
			for (String record : catalog) {
				log.debug("Got a record: " + record);
			}

			// We are done. Unsubscribe from further updates.
			List<String> topics = new ArrayList<String>();
			for (Subscription subscription : subscriptions) {
				topics.add(subscription.getTopic());
			}
			client.unsubscribe(topics);
		} catch (Exception ex) {
			log.error("An unexpected exception has occurred.", ex);
		} finally {
			if (!client.isClosed()) {
				client.disconnect();
			}
		}
	}

}
						

Synchronous Publisher

import net.sf.xenqtt.client.MqttClient;
import net.sf.xenqtt.client.MqttClientListener;
import net.sf.xenqtt.client.PublishMessage;
import net.sf.xenqtt.client.SyncMqttClient;
import net.sf.xenqtt.message.ConnectReturnCode;
import net.sf.xenqtt.message.QoS;

import org.apache.log4j.Logger;

/**
 * Produces hit music from days gone by.
 */
public class MusicProducer {

	private static final Logger log = Logger.getLogger(MusicProducer.class);

	public static void main(String... args) throws Throwable {
		MqttClientListener listener = new MqttClientListener() {

			@Override
			public void publishReceived(MqttClient client, PublishMessage message) {
				log.warn("Received a message when no subscriptions were active. Check your broker ;)");
			}

			@Override
			public void disconnected(MqttClient client, Throwable cause, boolean reconnecting) {
				if (cause != null) {
					log.error("Disconnected from the broker due to an exception.", cause);
				} else {
					log.info("Disconnected from the broker.");
				}

				if (reconnecting) {
					log.info("Attempting to reconnect to the broker.");
				}
			}
		};

		// Build your client. This client is a synchronous one so all interaction with the broker will block until said interaction completes.
		MqttClient client = new SyncMqttClient("tcp://mqtt-broker:1883", listener, 5);
		try {
			ConnectReturnCode returnCode = client.connect("musicProducer", false, "music-user", "music-pass");
			if (returnCode != ConnectReturnCode.ACCEPTED) {
				log.error("Unable to connect to the broker. Reason: " + returnCode);
				return;
			}

			// Publish a musical catalog
			client.publish(new PublishMessage("grand/funk/railroad", QoS.AT_MOST_ONCE, "On Time"));
			client.publish(new PublishMessage("grand/funk/railroad", QoS.AT_MOST_ONCE, "E Pluribus Funk"));
			client.publish(new PublishMessage("jefferson/airplane", QoS.AT_MOST_ONCE, "Surrealistic Pillow"));
			client.publish(new PublishMessage("jefferson/airplane", QoS.AT_MOST_ONCE, "Crown of Creation"));
			client.publish(new PublishMessage("seventies/prog/rush", QoS.AT_MOST_ONCE, "2112"));
			client.publish(new PublishMessage("seventies/prog/rush", QoS.AT_MOST_ONCE, "A Farewell to Kings"));
			client.publish(new PublishMessage("seventies/prog/rush", QoS.AT_MOST_ONCE, "Hemispheres"));
		} catch (Exception ex) {
			log.error("An exception prevented the publishing of the full catalog.", ex);
		} finally {
			// We are done. Disconnect.
			if (!client.isClosed()) {
				client.disconnect();
			}
		}
	}

}
						
Asynchronous Client Example

Asynchronous Subscriber

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

import net.sf.xenqtt.client.AsyncClientListener;
import net.sf.xenqtt.client.AsyncMqttClient;
import net.sf.xenqtt.client.MqttClient;
import net.sf.xenqtt.client.PublishMessage;
import net.sf.xenqtt.client.Subscription;
import net.sf.xenqtt.message.ConnectReturnCode;
import net.sf.xenqtt.message.QoS;

import org.apache.log4j.Logger;

/**
 * Builds music catalogs from years gone by.
 */
public class MusicSubscriberAsync {

	private static final Logger log = Logger.getLogger(MusicSubscriberAsync.class);

	public static void main(String... args) throws Throwable {
		final CountDownLatch connectLatch = new CountDownLatch(1);
		final AtomicReference<ConnectReturnCode> connectReturnCode = new AtomicReference<ConnectReturnCode>();
		final List<String> catalog = Collections.synchronizedList(new ArrayList<String>());
		AsyncClientListener listener = new AsyncClientListener() {

			@Override
			public void publishReceived(MqttClient client, PublishMessage message) {
				catalog.add(message.getPayloadString());
				message.ack();
			}

			@Override
			public void disconnected(MqttClient client, Throwable cause, boolean reconnecting) {
				if (cause != null) {
					log.error("Disconnected from the broker due to an exception.", cause);
				} else {
					log.info("Disconnecting from the broker.");
				}

				if (reconnecting) {
					log.info("Attempting to reconnect to the broker.");
				}
			}

			@Override
			public void connected(MqttClient client, ConnectReturnCode returnCode) {
				connectReturnCode.set(returnCode);
				connectLatch.countDown();
			}

			@Override
			public void published(MqttClient client, PublishMessage message) {
				// We do not publish so this should never be called, in theory ;).
			}

			@Override
			public void subscribed(MqttClient client, Subscription[] requestedSubscriptions, Subscription[] grantedSubscriptions, boolean requestsGranted) {
				if (!requestsGranted) {
					log.error("Unable to subscribe to the following subscriptions: " + Arrays.toString(requestedSubscriptions));
				}

				log.debug("Granted subscriptions: " + Arrays.toString(grantedSubscriptions));
			}

			@Override
			public void unsubscribed(MqttClient client, String[] topics) {
				log.debug("Unsubscribed from the following topics: " + Arrays.toString(topics));
			}

		};

		// Build your client. This client is an asynchronous one so all interaction with the broker will be non-blocking.
		AsyncMqttClient client = new AsyncMqttClient("tcp://mqtt-broker:1883", listener, 5);
		try {
			// Connect to the broker with a specific client ID. Only if the broker accepted the connection shall we proceed.
			client.connect("musicLover", true);
			ConnectReturnCode returnCode = connectReturnCode.get();
			if (returnCode == null || returnCode != ConnectReturnCode.ACCEPTED) {
				log.error("Unable to connect to the MQTT broker. Reason: " + returnCode);
				return;
			}

			// Create your subscriptions. In this case we want to build up a catalog of classic rock.
			List<Subscription> subscriptions = new ArrayList<Subscription>();
			subscriptions.add(new Subscription("grand/funk/railroad", QoS.AT_MOST_ONCE));
			subscriptions.add(new Subscription("jefferson/airplane", QoS.AT_MOST_ONCE));
			subscriptions.add(new Subscription("seventies/prog/#", QoS.AT_MOST_ONCE));
			client.subscribe(subscriptions);

			// Build up your catalog. After a while you've waited long enough so move on.
			try {
				Thread.sleep(30000);
			} catch (InterruptedException ignore) {
			}

			// Report on what we have found.
			for (String record : catalog) {
				log.debug("Got a record: " + record);
			}

			// We are done. Unsubscribe at this time.
			List<String> topics = new ArrayList<String>();
			for (Subscription subscription : subscriptions) {
				topics.add(subscription.getTopic());
			}
			client.unsubscribe(topics);
		} catch (Exception ex) {
			log.error("An unexpected exception has occurred.", ex);
		} finally {
			if (!client.isClosed()) {
				client.disconnect();
			}
		}
	}

}
						

Asynchronous Publisher

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

import net.sf.xenqtt.client.AsyncClientListener;
import net.sf.xenqtt.client.AsyncMqttClient;
import net.sf.xenqtt.client.MqttClient;
import net.sf.xenqtt.client.PublishMessage;
import net.sf.xenqtt.client.Subscription;
import net.sf.xenqtt.message.ConnectReturnCode;
import net.sf.xenqtt.message.QoS;

import org.apache.log4j.Logger;

/**
 * Produces hit music from days gone by.
 */
public class MusicProducerAsync {

	private static final Logger log = Logger.getLogger(MusicProducerAsync.class);

	public static void main(String... args) throws Throwable {
		final CountDownLatch connectLatch = new CountDownLatch(1);
		final AtomicReference<ConnectReturnCode> connectReturnCode = new AtomicReference<ConnectReturnCode>();
		AsyncClientListener listener = new AsyncClientListener() {

			@Override
			public void publishReceived(MqttClient client, PublishMessage message) {
				log.warn("Received a message when no subscriptions were active. Check your broker ;)");
			}

			@Override
			public void disconnected(MqttClient client, Throwable cause, boolean reconnecting) {
				if (cause != null) {
					log.error("Disconnected from the broker due to an exception.", cause);
				} else {
					log.info("Disconnected from the broker.");
				}

				if (reconnecting) {
					log.info("Attempting to reconnect to the broker.");
				}
			}

			@Override
			public void connected(MqttClient client, ConnectReturnCode returnCode) {
				connectReturnCode.set(returnCode);
				connectLatch.countDown();
			}

			@Override
			public void subscribed(MqttClient client, Subscription[] requestedSubscriptions, Subscription[] grantedSubscriptions, boolean requestsGranted) {
			}

			@Override
			public void unsubscribed(MqttClient client, String[] topics) {
			}

			@Override
			public void published(MqttClient client, PublishMessage message) {
			}

		};

		// Build your client. This client is an asynchronous one so all interaction with the broker will be non-blocking.
		MqttClient client = new AsyncMqttClient("tcp://mqtt-broker:1883", listener, 5);
		try {
			// Connect to the broker. We will await the return code so that we know whether or not we can even begin publishing.
			client.connect("musicProducerAsync", false, "music-user", "music-pass");
			connectLatch.await();

			ConnectReturnCode returnCode = connectReturnCode.get();
			if (returnCode == null || returnCode != ConnectReturnCode.ACCEPTED) {
				// The broker bounced us. We are done.
				log.error("The broker rejected our attempt to connect. Reason: " + returnCode);
				return;
			}

			// Publish a musical catalog
			client.publish(new PublishMessage("grand/funk/railroad", QoS.AT_MOST_ONCE, "On Time"));
			client.publish(new PublishMessage("grand/funk/railroad", QoS.AT_MOST_ONCE, "E Pluribus Funk"));
			client.publish(new PublishMessage("jefferson/airplane", QoS.AT_MOST_ONCE, "Surrealistic Pillow"));
			client.publish(new PublishMessage("jefferson/airplane", QoS.AT_MOST_ONCE, "Crown of Creation"));
			client.publish(new PublishMessage("seventies/prog/rush", QoS.AT_MOST_ONCE, "2112"));
			client.publish(new PublishMessage("seventies/prog/rush", QoS.AT_MOST_ONCE, "A Farewell to Kings"));
			client.publish(new PublishMessage("seventies/prog/rush", QoS.AT_MOST_ONCE, "Hemispheres"));
		} catch (Exception ex) {
			log.error("An exception prevented the publishing of the full catalog.", ex);
		} finally {
			// We are done. Disconnect.
			if (!client.isClosed()) {
				client.disconnect();
			}
		}
	}

}
						

Using the Mock Broker API

All examples are included in the xenqtt jar in the net.sf.xenqtt.examples package.

Mock Broker - Vanilla

The following example shows how to fire up an instance of the Mock Broker within a running JVM. The mock broker that launches uses the default setup and configuration. No special handlers or eventing rules are employed.
package net.sf.xenqtt.examples;

import net.sf.xenqtt.mockbroker.MockBroker;
import net.sf.xenqtt.mockbroker.MockBrokerHandler;

/**
 * Fires up a mock broker that specializes in routing data of the 'Glam' variety.
 */
public class GlamBroker {

	public static void main(String... args) throws InterruptedException {
		MockBrokerHandler handler = new MockBrokerHandler();
		MockBroker broker = new MockBroker(handler);

		broker.init(); // Blocks until startup is complete.

		// At this point the broker is online. Clients can connect to it, publish messages, subscribe, etc.
		Thread.sleep(60000);

		// We are done. Shutdown the broker. Wait forever (> 0 means wait that many milliseconds).
		broker.shutdown(0);
	}

}
						

Mock Broker - Custom Handler

The following example shows how to fire up a mock broker instance in the local JVM. The mock broker is given a custom event handler. This handler is invoked as events move through the broker. Such handlers can be used to customize the behavior of the broker and prove extremely useful in testing.
package net.sf.xenqtt.examples;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;

import net.sf.xenqtt.message.PubMessage;
import net.sf.xenqtt.message.QoS;
import net.sf.xenqtt.message.SubscribeMessage;
import net.sf.xenqtt.mockbroker.Client;
import net.sf.xenqtt.mockbroker.MockBroker;
import net.sf.xenqtt.mockbroker.MockBrokerHandler;

/**
 * Fires up a mock broker that specializes in routing data of the 'Glam' variety. This particular broker has a special handler that rejects any and all attempts
 * to interact with country music.
 */
public class GlamNoCountryBroker {

	public static void main(String... args) throws InterruptedException {
		MockBrokerHandler handler = new GlamBrokerHandler();
		MockBroker broker = new MockBroker(handler);

		broker.init(); // Blocks until startup is complete.

		// At this point the broker is online. Clients can connect to it, publish messages, subscribe, etc.
		Thread.sleep(60000);

		// We are done. Shutdown the broker. Wait forever (> 0 means wait that many milliseconds).
		broker.shutdown(0);
	}

	private static final class GlamBrokerHandler extends MockBrokerHandler {

		@Override
		public boolean publish(Client client, PubMessage message) throws Exception {
			String payload = new String(message.getPayload(), Charset.forName("UTF-8"));
			if (payload.indexOf("Country Music") > -1) {
				// We don't do that stuff here! Return true to suppress processing of the message
				return true;
			}

			return super.publish(client, message);
		}

		/**
		 * @see net.sf.xenqtt.mockbroker.MockBrokerHandler#subscribe(net.sf.xenqtt.mockbroker.Client, net.sf.xenqtt.message.SubscribeMessage)
		 */
		@Override
		public boolean subscribe(Client client, SubscribeMessage message) throws Exception {
			String[] topics = message.getTopics();
			QoS[] qoses = message.getRequestedQoSes();
			List allowedTopics = new ArrayList();
			List allowedQoses = new ArrayList();
			int index = 0;
			for (String topic : topics) {
				// Only allow topic subscriptions for topics that don't include country music.
				if (!topic.matches("^.*(?i:country).*$")) {
					allowedTopics.add(topic);
					allowedQoses.add(qoses[index]);
				}

				index++;
			}

			message = new SubscribeMessage(message.getMessageId(), allowedTopics.toArray(new String[0]), allowedQoses.toArray(new QoS[0]));

			return super.subscribe(client, message);
		}

	}

}
						

Please see the JavaDoc for a complete list of the methods that can be overridden on the MockBrokerHandler.

Mock Broker - Event Reporting

The following example shows how to fire up the mock broker in the local JVM. In addition to a custom handler the example shows how to interrogate the broker for certain events that pass through it. The information from these events can be used for many disparate purposes. These are typically application-specific.
package net.sf.xenqtt.examples;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.EnumMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import net.sf.xenqtt.message.MessageType;
import net.sf.xenqtt.message.PubMessage;
import net.sf.xenqtt.message.QoS;
import net.sf.xenqtt.message.SubscribeMessage;
import net.sf.xenqtt.mockbroker.BrokerEvent;
import net.sf.xenqtt.mockbroker.BrokerEventType;
import net.sf.xenqtt.mockbroker.Client;
import net.sf.xenqtt.mockbroker.MockBroker;
import net.sf.xenqtt.mockbroker.MockBrokerHandler;

/**
 * Fires up a mock broker that specializes in routing data of the 'Glam' variety. This particular broker has a special handler that rejects any and all attempts
 * to interact with country music. In addition, it allows for retrieval of events from the broker and their subsequent interrogation.
 */
public class GlamEventReportingBroker {

	public static void main(String... args) throws InterruptedException {
		MockBrokerHandler handler = new GlamBrokerHandler();
		MockBroker broker = new MockBroker(handler);

		broker.init(); // Blocks until startup is complete.

		// At this point the broker is online. Clients can connect to it, publish messages, subscribe, etc.
		Thread.sleep(60000);

		// Examine broker events and report some useful stats about them.
		List events = broker.getEvents();
		Set clientIds = new HashSet();
		Map eventTypes = new EnumMap(BrokerEventType.class);
		Map messageTypes = new EnumMap(MessageType.class);
		for (BrokerEvent event : events) {
			clientIds.add(event.getClientId());

			BrokerEventType brokerEventType = event.getEventType();
			Integer brokerEventCount = eventTypes.get(brokerEventType);
			if (brokerEventCount == null) {
				brokerEventCount = Integer.valueOf(0);
			}
			eventTypes.put(brokerEventType, Integer.valueOf(brokerEventCount.intValue() + 1));

			MessageType messageType = event.getMessage().getMessageType();
			Integer messageTypeCount = messageTypes.get(messageType);
			if (messageTypeCount == null) {
				messageTypeCount = Integer.valueOf(0);
			}
			messageTypes.put(messageType, Integer.valueOf(messageTypeCount.intValue() + 1));
		}
		System.out.printf("Total events: %d\n", events.size());
		System.out.printf("Total client IDs: %d\n", clientIds.size());

		System.out.println("Counts by broker event type:");
		for (Entry entry : eventTypes.entrySet()) {
			System.out.printf("\t%s: %d\n", entry.getKey().name(), entry.getValue());
		}

		System.out.printf("Counts by MQTT message type:");
		for (Entry entry : messageTypes.entrySet()) {
			System.out.printf("\t%s: %d\n", entry.getKey().name(), entry.getValue());
		}

		// We are done. Shutdown the broker. Wait forever (> 0 means wait that many milliseconds).
		broker.shutdown(0);
	}

	private static final class GlamBrokerHandler extends MockBrokerHandler {

		@Override
		public boolean publish(Client client, PubMessage message) throws Exception {
			String payload = new String(message.getPayload(), Charset.forName("UTF-8"));
			if (payload.indexOf("Country Music") > -1) {
				// We don't do that stuff here! Return true to suppress processing of the message
				return true;
			}

			return super.publish(client, message);
		}

		/**
		 * @see net.sf.xenqtt.mockbroker.MockBrokerHandler#subscribe(net.sf.xenqtt.mockbroker.Client, net.sf.xenqtt.message.SubscribeMessage)
		 */
		@Override
		public boolean subscribe(Client client, SubscribeMessage message) throws Exception {
			String[] topics = message.getTopics();
			QoS[] qoses = message.getRequestedQoSes();
			List allowedTopics = new ArrayList();
			List allowedQoses = new ArrayList();
			int index = 0;
			for (String topic : topics) {
				// Only allow topic subscriptions for topics that don't include country music.
				if (!topic.matches("^.*(?i:country).*$")) {
					allowedTopics.add(topic);
					allowedQoses.add(qoses[index]);
				}

				index++;
			}

			message = new SubscribeMessage(message.getMessageId(), allowedTopics.toArray(new String[0]), allowedQoses.toArray(new QoS[0]));

			return super.subscribe(client, message);
		}

	}

}
						

Related Resources

MQTT Overview