Class MqttBrokerConnection
java.lang.Object
org.openhab.core.io.transport.mqtt.MqttBrokerConnection
An MQTTBrokerConnection represents a single client connection to a MQTT broker.
When a connection to an MQTT broker is lost, it will try to reconnect every 60 seconds.
- Author:
- Davy Vanherbergen - Initial contribution, David Graeff - All operations are async now. More flexible sslContextProvider and reconnectStrategy added., Markus Rathgeb - added connection state callback, Jan N. Klug - changed from PAHO to HiveMQ client, Mark Herwege - Added flag for hostname validation
-
Nested Class Summary
Modifier and TypeClassDescriptionclass
Create a listener object for being used as a callback for a connection attempt.static enum
MQTT version (currently v3 and v5)static enum
MQTT transport protocols -
Field Summary
Modifier and TypeFieldDescriptionprotected @Nullable org.openhab.core.io.transport.mqtt.internal.client.MqttAsyncClientWrapper
protected final String
protected MqttBrokerConnection.ConnectionCallback
protected final List<MqttConnectionObserver>
static final int
static final MqttBrokerConnection.MqttVersion
static final MqttBrokerConnection.Protocol
static final int
protected final String
protected final boolean
protected boolean
protected final MqttBrokerConnection.MqttVersion
protected final int
protected final MqttBrokerConnection.Protocol
protected @Nullable AbstractReconnectStrategy
protected final boolean
protected @Nullable ScheduledExecutorService
protected final AtomicReference<@Nullable ScheduledFuture<?>>
-
Constructor Summary
ConstructorDescriptionMqttBrokerConnection
(String host, @Nullable Integer port, boolean secure, boolean hostnameValidated, @Nullable String clientId) Create a new TCP MQTT3 client connection to a server with the given host and port.MqttBrokerConnection
(String host, @Nullable Integer port, boolean secure, @Nullable String clientId) Create a new TCP MQTT3 client connection to a server with the given host and port.MqttBrokerConnection
(MqttBrokerConnection.Protocol protocol, MqttBrokerConnection.MqttVersion mqttVersion, String host, @Nullable Integer port, boolean secure, boolean hostnameValidated, @Nullable String clientId) Create a new MQTT client connection to a server with the given protocol, host and port.MqttBrokerConnection
(MqttBrokerConnection.Protocol protocol, MqttBrokerConnection.MqttVersion mqttVersion, String host, @Nullable Integer port, boolean secure, @Nullable String clientId) Create a new MQTT client connection to a server with the given protocol, host and port. -
Method Summary
Modifier and TypeMethodDescriptionvoid
addConnectionObserver
(MqttConnectionObserver connectionObserver) Add a new connection observer to this connection.protected void
The connection process is limited by a timeout, realized with aCompletableFuture
.Returns the connection stateprotected org.openhab.core.io.transport.mqtt.internal.client.MqttAsyncClientWrapper
protected boolean
finalizeStopAfterDisconnect
(boolean v) After a successful disconnect, the underlying library objects need to be closed and connection observers want to be notified.Get client id to use when connecting to the broker.getHost()
Get the MQTT broker hostint
Return the keep alive internal in seconds@Nullable MqttWillAndTestament
Return the last will object or null if there is none.Get the MQTT version@Nullable String
int
getPort()
Get the MQTT broker portGet the MQTT broker protocolint
getQos()
@Nullable AbstractReconnectStrategy
@Nullable String
getUser()
boolean
Return true if there are connection observers registered via addConnectionObserver().boolean
Return true if there are subscribers registered viasubscribe(String, MqttMessageSubscriber)
.boolean
Return true if hostname in certificate is validated against server hostname for secure connectionboolean
isSecure()
Return true if this is or will be an encrypted connection to the brokerPublish a message to the broker with the given QoS and retained flag.void
removeConnectionObserver
(MqttConnectionObserver connectionObserver) Remove a previously registered connection observer from this connection.void
setCredentials
(@Nullable String user, @Nullable String password) Set the optional user name and optional password to use when connecting to the MQTT broker.void
setKeepAliveInterval
(int keepAliveInterval) Set the keep alive interval.void
setLastWill
(@Nullable MqttWillAndTestament lastWill) Set the last will object.void
setLastWill
(@Nullable MqttWillAndTestament lastWill, boolean applyImmediately) Set the last will object.void
setQos
(int qos) Set quality of service.void
setReconnectStrategy
(AbstractReconnectStrategy reconnectStrategy) Set the reconnect strategy.void
setTimeoutExecutor
(@Nullable ScheduledExecutorService executor, int timeoutInMS) Set a timeout executor.void
setTrustManagers
(TrustManager[] trustManagers) void
setUnsubscribeOnStop
(boolean unsubscribeOnStop) Enable / disable sending Unsubscribe command when the connection is closed Some servers can be quirky, then do not handle Usubscribe request properly.start()
This will establish a connection to the MQTT broker and if successful, notify all publishers and subscribers that the connection has become active.stop()
Unsubscribes from all subscribed topics, stops the reconnect strategy, disconnect and close the client.subscribe
(String topic, MqttMessageSubscriber subscriber) Add a new message consumer to this connection.protected CompletableFuture<Boolean>
subscribeRaw
(String topic, org.openhab.core.io.transport.mqtt.internal.Subscription subscription) Subscribes to a topic on the given connection, but does not alter the subscriber list.unsubscribe
(String topic, MqttMessageSubscriber subscriber) Remove a previously registered consumer from this connection.Unsubscribe from all topicsprotected CompletableFuture<Boolean>
unsubscribeRaw
(org.openhab.core.io.transport.mqtt.internal.client.MqttAsyncClientWrapper client, String topic) Unsubscribes from a topic on the given connection, but does not alter the subscriber list.
-
Field Details
-
DEFAULT_PROTOCOL
-
DEFAULT_MQTT_VERSION
-
DEFAULT_KEEPALIVE_INTERVAL
public static final int DEFAULT_KEEPALIVE_INTERVAL- See Also:
-
DEFAULT_QOS
public static final int DEFAULT_QOS- See Also:
-
protocol
-
host
-
port
protected final int port -
secure
protected final boolean secure -
hostnameValidated
protected final boolean hostnameValidated -
mqttVersion
-
clientId
-
reconnectStrategy
-
client
protected @Nullable org.openhab.core.io.transport.mqtt.internal.client.MqttAsyncClientWrapper client -
isConnecting
protected boolean isConnecting -
connectionObservers
-
subscribers
-
timeoutFuture
-
timeoutExecutor
-
connectionCallback
-
-
Constructor Details
-
MqttBrokerConnection
public MqttBrokerConnection(String host, @Nullable Integer port, boolean secure, @Nullable String clientId) Create a new TCP MQTT3 client connection to a server with the given host and port.- Parameters:
host
- A host name or addressport
- A port or null to select the default port for a secure or insecure connectionsecure
- A secure connectionclientId
- Client id. Each client on a MQTT server has a unique client id. Sometimes client ids are used for access restriction implementations. If none is specified, a default is generated. The client id cannot be longer than 65535 characters.- Throws:
IllegalArgumentException
- If the client id or port is not valid.
-
MqttBrokerConnection
public MqttBrokerConnection(String host, @Nullable Integer port, boolean secure, boolean hostnameValidated, @Nullable String clientId) Create a new TCP MQTT3 client connection to a server with the given host and port.- Parameters:
host
- A host name or addressport
- A port or null to select the default port for a secure or insecure connectionsecure
- A secure connectionhostnameValidated
- Validate hostname from certificate against server hostname for secure connectionclientId
- Client id. Each client on a MQTT server has a unique client id. Sometimes client ids are used for access restriction implementations. If none is specified, a default is generated. The client id cannot be longer than 65535 characters.- Throws:
IllegalArgumentException
- If the client id or port is not valid.
-
MqttBrokerConnection
public MqttBrokerConnection(MqttBrokerConnection.Protocol protocol, MqttBrokerConnection.MqttVersion mqttVersion, String host, @Nullable Integer port, boolean secure, @Nullable String clientId) Create a new MQTT client connection to a server with the given protocol, host and port.- Parameters:
protocol
- The transport protocolmqttVersion
- The version of the MQTT client (v3 or v5)host
- A host name or addressport
- A port or null to select the default port for a secure or insecure connectionsecure
- A secure connectionclientId
- Client id. Each client on a MQTT server has a unique client id. Sometimes client ids are used for access restriction implementations. If none is specified, a default is generated. The client id cannot be longer than 65535 characters.- Throws:
IllegalArgumentException
- If the client id or port is not valid.
-
MqttBrokerConnection
public MqttBrokerConnection(MqttBrokerConnection.Protocol protocol, MqttBrokerConnection.MqttVersion mqttVersion, String host, @Nullable Integer port, boolean secure, boolean hostnameValidated, @Nullable String clientId) Create a new MQTT client connection to a server with the given protocol, host and port.- Parameters:
protocol
- The transport protocolmqttVersion
- The version of the MQTT client (v3 or v5)host
- A host name or addressport
- A port or null to select the default port for a secure or insecure connectionsecure
- A secure connectionhostnameValidated
- Validate hostname from certificate against server hostname for secure connectionclientId
- Client id. Each client on a MQTT server has a unique client id. Sometimes client ids are used for access restriction implementations. If none is specified, a default is generated. The client id cannot be longer than 65535 characters.- Throws:
IllegalArgumentException
- If the client id or port is not valid.
-
-
Method Details
-
setReconnectStrategy
Set the reconnect strategy. The implementor will be called when the connection state to the MQTT broker changed. The reconnect strategy will not be informed if the initial connection to the broker timed out. You need a timeout executor additionally, seesetTimeoutExecutor(ScheduledExecutorService, int)
.- Parameters:
reconnectStrategy
- The reconnect strategy. May not be null.
-
getReconnectStrategy
- Returns:
- Return the reconnect strategy
-
setTimeoutExecutor
Set a timeout executor. If none is set, you will not be notified of connection timeouts, this also includes a non-firing reconnect strategy. The default executor is none.- Parameters:
executor
- One timer will be created when a connection attempt happenstimeoutInMS
- Timeout in milliseconds
-
setTrustManagers
-
getTrustManagers
-
getProtocol
Get the MQTT broker protocol -
getMqttVersion
Get the MQTT version -
getHost
Get the MQTT broker host -
getPort
public int getPort()Get the MQTT broker port -
isSecure
public boolean isSecure()Return true if this is or will be an encrypted connection to the broker -
isHostnameValidated
public boolean isHostnameValidated()Return true if hostname in certificate is validated against server hostname for secure connection -
setCredentials
Set the optional user name and optional password to use when connecting to the MQTT broker. The connection needs to be restarted for the new settings to take effect.- Parameters:
user
- Name to use for connection.password
- The password
-
getPassword
- Returns:
- connection password.
-
getUser
- Returns:
- optional user name for the MQTT connection.
-
getQos
public int getQos()- Returns:
- quality of service level.
-
setQos
public void setQos(int qos) Set quality of service. Valid values are 0, 1, 2 and mean "at most once", "at least once" and "exactly once" respectively. The connection needs to be restarted for the new settings to take effect.- Parameters:
qos
- level.
-
getLastWill
Return the last will object or null if there is none. -
setLastWill
public void setLastWill(@Nullable MqttWillAndTestament lastWill, boolean applyImmediately) throws org.osgi.service.cm.ConfigurationException, MqttException Set the last will object.- Parameters:
lastWill
- The last will object or null.applyImmediately
- If true, the connection will stopped and started for the new last-will to take effect immediately.- Throws:
MqttException
org.osgi.service.cm.ConfigurationException
-
setLastWill
Set the last will object. The connection needs to be restarted for the new settings to take effect.- Parameters:
lastWill
- The last will object or null.
-
setUnsubscribeOnStop
public void setUnsubscribeOnStop(boolean unsubscribeOnStop) Enable / disable sending Unsubscribe command when the connection is closed Some servers can be quirky, then do not handle Usubscribe request properly. In this case we have to omit sending it. Example: iRobot built-in MQTT server. By default this behavior is set to true.- Parameters:
unsubscribeOnStop
- Enable or disable flag.
-
getClientId
Get client id to use when connecting to the broker.- Returns:
- value clientId to use.
-
connectionState
Returns the connection state -
setKeepAliveInterval
public void setKeepAliveInterval(int keepAliveInterval) Set the keep alive interval. The default interval is 60 seconds. If no heartbeat is received within this timeframe, the connection will be considered dead. Set this to a higher value on systems which may not always be able to process the heartbeat in time.- Parameters:
keepAliveInterval
- interval in seconds
-
getKeepAliveInterval
public int getKeepAliveInterval()Return the keep alive internal in seconds -
hasSubscribers
public boolean hasSubscribers()Return true if there are subscribers registered viasubscribe(String, MqttMessageSubscriber)
. Callunsubscribe(String, MqttMessageSubscriber)
orunsubscribeAll()
if necessary. -
subscribe
Add a new message consumer to this connection. Multiple subscribers with the same topic are allowed. This method will not protect you from adding a subscriber object multiple times! If there is a retained message for the topic, you are guaranteed to receive a callback for each new subscriber, even for the same topic.- Parameters:
topic
- The topic to subscribe to.subscriber
- The callback listener for received messages for the given topic.- Returns:
- Completes with true if successful. Completes with false if not connected yet. Exceptionally otherwise.
-
subscribeRaw
protected CompletableFuture<Boolean> subscribeRaw(String topic, org.openhab.core.io.transport.mqtt.internal.Subscription subscription) Subscribes to a topic on the given connection, but does not alter the subscriber list.- Parameters:
topic
- The topic to subscribe to.- Returns:
- Completes with true if successful. Exceptionally otherwise.
-
unsubscribe
Remove a previously registered consumer from this connection. If no more consumers are registered for a topic, the topic will be unsubscribed from.- Parameters:
topic
- The topic to unsubscribe from.subscriber
- The callback listener to remove.- Returns:
- Completes with true if successful. Exceptionally otherwise.
-
unsubscribeRaw
protected CompletableFuture<Boolean> unsubscribeRaw(org.openhab.core.io.transport.mqtt.internal.client.MqttAsyncClientWrapper client, String topic) Unsubscribes from a topic on the given connection, but does not alter the subscriber list.- Parameters:
client
- The client connectiontopic
- The topic to unsubscribe from- Returns:
- Completes with true if successful. Completes with false if no broker connection is established. Exceptionally otherwise.
-
addConnectionObserver
Add a new connection observer to this connection.- Parameters:
connectionObserver
- The connection observer that should be added.
-
removeConnectionObserver
Remove a previously registered connection observer from this connection.- Parameters:
connectionObserver
- The connection observer that should be removed.
-
hasConnectionObservers
public boolean hasConnectionObservers()Return true if there are connection observers registered via addConnectionObserver(). -
start
This will establish a connection to the MQTT broker and if successful, notify all publishers and subscribers that the connection has become active. This method will do nothing if there is already an active connection.- Returns:
- Returns a future that completes with true if already connected or connecting, completes with false if a connection timeout has happened and completes exceptionally otherwise.
-
createClient
protected org.openhab.core.io.transport.mqtt.internal.client.MqttAsyncClientWrapper createClient() -
finalizeStopAfterDisconnect
protected boolean finalizeStopAfterDisconnect(boolean v) After a successful disconnect, the underlying library objects need to be closed and connection observers want to be notified.- Parameters:
v
- A passthrough boolean value- Returns:
- Returns the value of the parameter v.
-
unsubscribeAll
Unsubscribe from all topics- Returns:
- Returns a future that completes as soon as all subscriptions have been canceled.
-
stop
Unsubscribes from all subscribed topics, stops the reconnect strategy, disconnect and close the client. You can re-establish a connection callingstart()
again. Do not call start, before the closing process has finished completely.- Returns:
- Returns a future that completes as soon as the disconnect process has finished.
-
publish
Publish a message to the broker with the given QoS and retained flag.- Parameters:
topic
- The topicpayload
- The message payloadqos
- The quality of service for this messageretain
- Set to true to retain the message on the broker- Returns:
- Returns a future that completes with a result of true if the publishing succeeded and completes exceptionally on an error or with a result of false if no broker connection is established.
-
cancelTimeoutFuture
protected void cancelTimeoutFuture()The connection process is limited by a timeout, realized with aCompletableFuture
. Cancel that future now, if it exists.
-