net.sf.xenqtt.message
Class ChannelManagerImpl

java.lang.Object
  extended by net.sf.xenqtt.message.ChannelManagerImpl
All Implemented Interfaces:
ChannelManager

public final class ChannelManagerImpl
extends Object
implements ChannelManager

Uses a single thread and non-blocking NIO to manage one or more MqttChannels. You must call init() before using this manager and shutdown() to shut it down.


Constructor Summary
ChannelManagerImpl(long messageResendIntervalSeconds)
          Use this constructor for the asynchronous API
ChannelManagerImpl(long messageResendIntervalSeconds, int blockingTimeoutSeconds)
          Use this constructor for the synchronous API
 
Method Summary
 void attachChannel(MqttChannelRef channel, MessageHandler messageHandler)
          Attaches the specified channel to this manager's control.
 void cancelBlockingCommands(MqttChannelRef channel)
          Cancels all blocking commands for the specified channel.
 void close(MqttChannelRef channel)
          Closes the specified channel.
 void close(MqttChannelRef channel, Throwable cause)
          Closes the specified channel and sends cause to the MessageHandler.channelClosed(MqttChannel, Throwable) callback.
 void detachChannel(MqttChannelRef channel)
          Detaches the specified channel from this manager's control.
 MessageStats getStats(boolean reset)
          Request the current snapshot of statistics that are available from this channel manager.
 List<MqttMessage> getUnsentMessages(MqttChannelRef channel)
           
 void init()
          Starts this channel manager.
 boolean isRunning()
           
 MqttChannelRef newBrokerChannel(SocketChannel socketChannel, MessageHandler messageHandler)
          Create a new broker side MqttChannelRef for use in exchanging data using the MQTT protocol.
 MqttChannelRef newClientChannel(String host, int port, MessageHandler messageHandler)
          Create a new client side MqttChannelRef for use in exchanging data using the MQTT protocol.
 MqttChannelRef newClientChannel(String brokerUri, MessageHandler messageHandler)
          Create a new client side MqttChannelRef for use in exchanging data using the MQTT protocol.
 MqttChannelRef newClientChannel(URI brokerUri, MessageHandler messageHandler)
          Create a new client side MqttChannelRef for use in exchanging data using the MQTT protocol.
<T extends MqttMessage>
T
send(MqttChannelRef channel, MqttMessage message)
          Send a message over a specified channel.
 void shutdown()
          Stops this channel manager.
 void transfer(MqttChannelRef oldChannel, MqttChannelRef newChannel)
          Transfers unsent messages from oldChannel to newChannel and changes oldChannel such that any messages sent to it will actually go to newChannel.
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Constructor Detail

ChannelManagerImpl

public ChannelManagerImpl(long messageResendIntervalSeconds)
Use this constructor for the asynchronous API

Parameters:
messageResendIntervalSeconds - Seconds between attempts to resend a message that is MqttMessage.isAckable(). 0 to disable message resends

ChannelManagerImpl

public ChannelManagerImpl(long messageResendIntervalSeconds,
                          int blockingTimeoutSeconds)
Use this constructor for the synchronous API

Parameters:
messageResendIntervalSeconds - Seconds between attempts to resend a message that is MqttMessage.isAckable(). 0 to disable message resends
blockingTimeoutSeconds - Seconds until a blocked method invocation times out and an MqttTimeoutException is thrown. -1 will create a non-blocking API, 0 will create a blocking API with no timeout, > 0 will create a blocking API with the specified timeout.
Method Detail

init

public void init()
Description copied from interface: ChannelManager
Starts this channel manager. Must be called before any other methods

Specified by:
init in interface ChannelManager
See Also:
ChannelManager.init()

shutdown

public void shutdown()
Description copied from interface: ChannelManager
Stops this channel manager. Any other methods called after this have unpredictable results. Closes all open channels. Blocks until shutdown is complete.

Specified by:
shutdown in interface ChannelManager
See Also:
ChannelManager.shutdown()

isRunning

public boolean isRunning()
Specified by:
isRunning in interface ChannelManager
Returns:
True if this manager is running and not yet stopped. This really means the manager's IO thread is running.
See Also:
ChannelManager.isRunning()

newClientChannel

public MqttChannelRef newClientChannel(String brokerUri,
                                       MessageHandler messageHandler)
                                throws MqttInterruptedException
Description copied from interface: ChannelManager
Create a new client side MqttChannelRef for use in exchanging data using the MQTT protocol. This is the client end of the connection. The broker will have the remote end of the connection. This method only blocks long enough for the channel to be created, not for the TCP connection to happen.

Specified by:
newClientChannel in interface ChannelManager
Parameters:
brokerUri - URI of the broker to connect to. For example, tcp://q.m2m.io:1883.
messageHandler - The message handler to use for all received messages
Returns:
The newly-created channel. The channel may only be safely accessed from the MessageHandler callback methods.
Throws:
MqttInterruptedException - Thrown when the calling thread is interrupted
See Also:
ChannelManager.newClientChannel(java.lang.String, net.sf.xenqtt.message.MessageHandler)

newClientChannel

public MqttChannelRef newClientChannel(URI brokerUri,
                                       MessageHandler messageHandler)
                                throws MqttInterruptedException
Description copied from interface: ChannelManager
Create a new client side MqttChannelRef for use in exchanging data using the MQTT protocol. This is the client end of the connection. The broker will have the remote end of the connection. This method only blocks long enough for the channel to be created, not for the TCP connection to happen. This will throw a RuntimeException wrapping any exception thrown while initializing the connection like UnresolvedAddressException

Specified by:
newClientChannel in interface ChannelManager
Parameters:
brokerUri - URI of the broker to connect to. For example, tcp://q.m2m.io:1883.
messageHandler - The message handler to use for all received messages
Returns:
The newly-created channel. The channel may only be safely accessed from the MessageHandler callback methods.
Throws:
MqttInterruptedException - Thrown when the calling thread is interrupted
See Also:
ChannelManager.newClientChannel(java.net.URI, net.sf.xenqtt.message.MessageHandler)

newClientChannel

public MqttChannelRef newClientChannel(String host,
                                       int port,
                                       MessageHandler messageHandler)
                                throws MqttInterruptedException
Description copied from interface: ChannelManager
Create a new client side MqttChannelRef for use in exchanging data using the MQTT protocol. This is the client end of the connection. The broker will have the remote end of the connection. This method only blocks long enough for the channel to be created, not for the TCP connection to happen. This will throw a RuntimeException wrapping any exception thrown while initializing the connection like UnresolvedAddressException

Specified by:
newClientChannel in interface ChannelManager
Parameters:
host - The host name to connect to
port - The port to connect to
messageHandler - The message handler to use for all received messages
Returns:
The newly-created channel. The channel may only be safely accessed from the MessageHandler callback methods.
Throws:
MqttInterruptedException - Thrown when the calling thread is interrupted
See Also:
ChannelManager.newClientChannel(java.lang.String, int, net.sf.xenqtt.message.MessageHandler)

newBrokerChannel

public MqttChannelRef newBrokerChannel(SocketChannel socketChannel,
                                       MessageHandler messageHandler)
                                throws MqttInterruptedException
Description copied from interface: ChannelManager
Create a new broker side MqttChannelRef for use in exchanging data using the MQTT protocol. This is the broker end of the connection. The client will have the remote end of the connection. If an exception is thrown the socketChannel will be closed. This method only blocks long enough for the channel to be created.

Specified by:
newBrokerChannel in interface ChannelManager
Parameters:
socketChannel - The channel to communicate over
messageHandler - The message handler to use for all received messages
Returns:
The newly-created channel.The channel may only be safely accessed from the MessageHandler callback methods.
Throws:
MqttInterruptedException - Thrown when the calling thread is interrupted
See Also:
ChannelManager.newBrokerChannel(java.nio.channels.SocketChannel, net.sf.xenqtt.message.MessageHandler)

send

public <T extends MqttMessage> T send(MqttChannelRef channel,
                                      MqttMessage message)
                           throws MqttInterruptedException
Description copied from interface: ChannelManager
Send a message over a specified channel. This method only blocks until the message is queued to send to the channel.

Specified by:
send in interface ChannelManager
Parameters:
channel - The channel to send the message over. This channel should have been previously created via the #newChannel(String, int, MessageHandler) or #newChannel(SocketChannel, MessageHandler) methods
message - The message to send. This can be any type of MQTT message
Returns:
In a synchronous implementation this returns the ack message if the message being sent is a ConnectMessage or has a QoS > 0.
Throws:
MqttInterruptedException - Thrown when the calling thread is interrupted
See Also:
ChannelManager.send(net.sf.xenqtt.message.MqttChannelRef, net.sf.xenqtt.message.MqttMessage)

close

public void close(MqttChannelRef channel)
           throws MqttInterruptedException
Description copied from interface: ChannelManager
Closes the specified channel. This method blocks until the channel is closed.

Specified by:
close in interface ChannelManager
Throws:
MqttInterruptedException - Thrown when the calling thread is interrupted
See Also:
ChannelManager.close(net.sf.xenqtt.message.MqttChannelRef)

close

public void close(MqttChannelRef channel,
                  Throwable cause)
Description copied from interface: ChannelManager
Closes the specified channel and sends cause to the MessageHandler.channelClosed(MqttChannel, Throwable) callback. This method blocks until the channel is closed.

Specified by:
close in interface ChannelManager
See Also:
ChannelManager.close(net.sf.xenqtt.message.MqttChannelRef, java.lang.Throwable)

cancelBlockingCommands

public void cancelBlockingCommands(MqttChannelRef channel)
Description copied from interface: ChannelManager
Cancels all blocking commands for the specified channel. This is not done when the channel is closed because we may want to reconnect instead of releasing the commands.

Specified by:
cancelBlockingCommands in interface ChannelManager
See Also:
ChannelManager.cancelBlockingCommands(net.sf.xenqtt.message.MqttChannelRef)

getUnsentMessages

public List<MqttMessage> getUnsentMessages(MqttChannelRef channel)
Specified by:
getUnsentMessages in interface ChannelManager
Returns:
All messages that have not been sent. This includes messages queued to be sent, any partially sent message, and all in flight messages.
See Also:
ChannelManager.getUnsentMessages(net.sf.xenqtt.message.MqttChannelRef)

transfer

public void transfer(MqttChannelRef oldChannel,
                     MqttChannelRef newChannel)
Description copied from interface: ChannelManager
Transfers unsent messages from oldChannel to newChannel and changes oldChannel such that any messages sent to it will actually go to newChannel. This is used by reconnection logic to safely use a new connection in place of one that closed.

Specified by:
transfer in interface ChannelManager
See Also:
ChannelManager.transfer(net.sf.xenqtt.message.MqttChannelRef, net.sf.xenqtt.message.MqttChannelRef)

detachChannel

public void detachChannel(MqttChannelRef channel)
                   throws MqttCommandCancelledException,
                          MqttTimeoutException,
                          MqttInterruptedException,
                          MqttInvocationException,
                          MqttInvocationError
Description copied from interface: ChannelManager
Detaches the specified channel from this manager's control. This is used in conjunction with #attachChannel(MqttChannelRef) to move handling of the channel's messages from one manager to another. This is always a synchronous operation. It is safe to call this from the ChannelManagerImpl IO thread.

Specified by:
detachChannel in interface ChannelManager
Parameters:
channel - The channel to detach
Throws:
MqttCommandCancelledException - The channel manager uses a command pattern to process this request on the IO thread. If the command is cancelled for some reason, like the channel closes, this exception is thrown.
MqttTimeoutException - Thrown when using a synchronous implementation and the timeout specified for a blocked method expires
MqttInterruptedException - Thrown when the calling thread is interrupted
MqttInvocationException - The channel manager uses a command pattern to process this request on the IO thread. Any Exception thrown while the command is being processed will be wrapped in an MqttInvocationException.
MqttInvocationError - The channel manager uses a command pattern to process this request on the IO thread. Any Error thrown while the command is being processed will be wrapped in an MqttInvocationError.
See Also:
ChannelManager.detachChannel(net.sf.xenqtt.message.MqttChannelRef)

attachChannel

public void attachChannel(MqttChannelRef channel,
                          MessageHandler messageHandler)
                   throws MqttCommandCancelledException,
                          MqttTimeoutException,
                          MqttInterruptedException,
                          MqttInvocationException,
                          MqttInvocationError
Description copied from interface: ChannelManager
Attaches the specified channel to this manager's control. This is used in conjunction with ChannelManager.detachChannel(MqttChannelRef) to move handling of the channel's messages from one manager to another. This is always a synchronous operation.

Specified by:
attachChannel in interface ChannelManager
Parameters:
channel - The channel to attach
messageHandler - The message handler to use for all received messages
Throws:
MqttCommandCancelledException - The channel manager uses a command pattern to process this request on the IO thread. If the command is cancelled for some reason, like the channel closes, this exception is thrown.
MqttTimeoutException - Thrown when using a synchronous implementation and the timeout specified for a blocked method expires
MqttInterruptedException - Thrown when the calling thread is interrupted
MqttInvocationException - The channel manager uses a command pattern to process this request on the IO thread. Any Exception thrown while the command is being processed will be wrapped in an MqttInvocationException.
MqttInvocationError - The channel manager uses a command pattern to process this request on the IO thread. Any Error thrown while the command is being processed will be wrapped in an MqttInvocationError.
See Also:
ChannelManager.attachChannel(net.sf.xenqtt.message.MqttChannelRef, net.sf.xenqtt.message.MessageHandler)

getStats

public MessageStats getStats(boolean reset)
Description copied from interface: ChannelManager
Request the current snapshot of statistics that are available from this channel manager.

Specified by:
getStats in interface ChannelManager
Parameters:
reset - If true following the acquisition of the statistics all applicable counters that can be reset will be. This is useful if statistics are being gathered and reported at regular intervals and the desire is to see the statistics for the previous interval
Returns:
The statistics for the MQTT client
See Also:
ChannelManager.getStats(boolean)


Copyright © 2013. All Rights Reserved.