Bump paho-mqtt from 1.6.1 to 2.0.0 (#2288)

* Bump paho-mqtt from 1.6.1 to 2.0.0

Bumps [paho-mqtt](https://github.com/eclipse/paho.mqtt.python) from 1.6.1 to 2.0.0.
- [Release notes](https://github.com/eclipse/paho.mqtt.python/releases)
- [Changelog](https://github.com/eclipse/paho.mqtt.python/blob/master/ChangeLog.txt)
- [Commits](https://github.com/eclipse/paho.mqtt.python/compare/v1.6.1...v2.0.0)

---
updated-dependencies:
- dependency-name: paho-mqtt
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>

* Update paho-mqtt==2.0.0

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: JonnyWong16 <9099342+JonnyWong16@users.noreply.github.com>

[skip ci]
This commit is contained in:
dependabot[bot] 2024-03-30 15:27:16 -07:00 committed by GitHub
parent 75a1750a4e
commit f82aecb88c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 2643 additions and 1443 deletions

View file

@ -1,4 +1,4 @@
__version__ = "1.6.1"
__version__ = "2.0.0"
class MQTTException(Exception):

File diff suppressed because it is too large Load diff

113
lib/paho/mqtt/enums.py Normal file
View file

@ -0,0 +1,113 @@
import enum
class MQTTErrorCode(enum.IntEnum):
MQTT_ERR_AGAIN = -1
MQTT_ERR_SUCCESS = 0
MQTT_ERR_NOMEM = 1
MQTT_ERR_PROTOCOL = 2
MQTT_ERR_INVAL = 3
MQTT_ERR_NO_CONN = 4
MQTT_ERR_CONN_REFUSED = 5
MQTT_ERR_NOT_FOUND = 6
MQTT_ERR_CONN_LOST = 7
MQTT_ERR_TLS = 8
MQTT_ERR_PAYLOAD_SIZE = 9
MQTT_ERR_NOT_SUPPORTED = 10
MQTT_ERR_AUTH = 11
MQTT_ERR_ACL_DENIED = 12
MQTT_ERR_UNKNOWN = 13
MQTT_ERR_ERRNO = 14
MQTT_ERR_QUEUE_SIZE = 15
MQTT_ERR_KEEPALIVE = 16
class MQTTProtocolVersion(enum.IntEnum):
MQTTv31 = 3
MQTTv311 = 4
MQTTv5 = 5
class CallbackAPIVersion(enum.Enum):
"""Defined the arguments passed to all user-callback.
See each callbacks for details: `on_connect`, `on_connect_fail`, `on_disconnect`, `on_message`, `on_publish`,
`on_subscribe`, `on_unsubscribe`, `on_log`, `on_socket_open`, `on_socket_close`,
`on_socket_register_write`, `on_socket_unregister_write`
"""
VERSION1 = 1
"""The version used with paho-mqtt 1.x before introducing CallbackAPIVersion.
This version had different arguments depending if MQTTv5 or MQTTv3 was used. `Properties` & `ReasonCode` were missing
on some callback (apply only to MQTTv5).
This version is deprecated and will be removed in version 3.0.
"""
VERSION2 = 2
""" This version fix some of the shortcoming of previous version.
Callback have the same signature if using MQTTv5 or MQTTv3. `ReasonCode` are used in MQTTv3.
"""
class MessageType(enum.IntEnum):
CONNECT = 0x10
CONNACK = 0x20
PUBLISH = 0x30
PUBACK = 0x40
PUBREC = 0x50
PUBREL = 0x60
PUBCOMP = 0x70
SUBSCRIBE = 0x80
SUBACK = 0x90
UNSUBSCRIBE = 0xA0
UNSUBACK = 0xB0
PINGREQ = 0xC0
PINGRESP = 0xD0
DISCONNECT = 0xE0
AUTH = 0xF0
class LogLevel(enum.IntEnum):
MQTT_LOG_INFO = 0x01
MQTT_LOG_NOTICE = 0x02
MQTT_LOG_WARNING = 0x04
MQTT_LOG_ERR = 0x08
MQTT_LOG_DEBUG = 0x10
class ConnackCode(enum.IntEnum):
CONNACK_ACCEPTED = 0
CONNACK_REFUSED_PROTOCOL_VERSION = 1
CONNACK_REFUSED_IDENTIFIER_REJECTED = 2
CONNACK_REFUSED_SERVER_UNAVAILABLE = 3
CONNACK_REFUSED_BAD_USERNAME_PASSWORD = 4
CONNACK_REFUSED_NOT_AUTHORIZED = 5
class _ConnectionState(enum.Enum):
MQTT_CS_NEW = enum.auto()
MQTT_CS_CONNECT_ASYNC = enum.auto()
MQTT_CS_CONNECTING = enum.auto()
MQTT_CS_CONNECTED = enum.auto()
MQTT_CS_CONNECTION_LOST = enum.auto()
MQTT_CS_DISCONNECTING = enum.auto()
MQTT_CS_DISCONNECTED = enum.auto()
class MessageState(enum.IntEnum):
MQTT_MS_INVALID = 0
MQTT_MS_PUBLISH = 1
MQTT_MS_WAIT_FOR_PUBACK = 2
MQTT_MS_WAIT_FOR_PUBREC = 3
MQTT_MS_RESEND_PUBREL = 4
MQTT_MS_WAIT_FOR_PUBREL = 5
MQTT_MS_RESEND_PUBCOMP = 6
MQTT_MS_WAIT_FOR_PUBCOMP = 7
MQTT_MS_SEND_PUBREC = 8
MQTT_MS_QUEUED = 9
class PahoClientMode(enum.IntEnum):
MQTT_CLIENT = 0
MQTT_BRIDGE = 1

View file

@ -1,4 +1,4 @@
class MQTTMatcher(object):
class MQTTMatcher:
"""Intended to manage topic filters including wildcards.
Internally, MQTTMatcher use a prefix tree (trie) to store
@ -6,7 +6,7 @@ class MQTTMatcher(object):
method to iterate efficiently over all filters that match
some topic name."""
class Node(object):
class Node:
__slots__ = '_children', '_content'
def __init__(self):
@ -33,8 +33,8 @@ class MQTTMatcher(object):
if node._content is None:
raise KeyError(key)
return node._content
except KeyError:
raise KeyError(key)
except KeyError as ke:
raise KeyError(key) from ke
def __delitem__(self, key):
"""Delete the value associated with some topic filter :key"""
@ -46,8 +46,8 @@ class MQTTMatcher(object):
lst.append((parent, k, node))
# TODO
node._content = None
except KeyError:
raise KeyError(key)
except KeyError as ke:
raise KeyError(key) from ke
else: # cleanup
for parent, k, node in reversed(lst):
if node._children or node._content is not None:

View file

@ -7,7 +7,7 @@
and Eclipse Distribution License v1.0 which accompany this distribution.
The Eclipse Public License is available at
http://www.eclipse.org/legal/epl-v10.html
http://www.eclipse.org/legal/epl-v20.html
and the Eclipse Distribution License is available at
http://www.eclipse.org/org/documents/edl-v10.php.
@ -37,7 +37,7 @@ class PacketTypes:
# Dummy packet type for properties use - will delay only applies to will
WILLMESSAGE = 99
Names = [ "reserved", \
Names = ( "reserved", \
"Connect", "Connack", "Publish", "Puback", "Pubrec", "Pubrel", \
"Pubcomp", "Subscribe", "Suback", "Unsubscribe", "Unsuback", \
"Pingreq", "Pingresp", "Disconnect", "Auth"]
"Pingreq", "Pingresp", "Disconnect", "Auth")

View file

@ -1,23 +1,20 @@
"""
*******************************************************************
Copyright (c) 2017, 2019 IBM Corp.
All rights reserved. This program and the accompanying materials
are made available under the terms of the Eclipse Public License v2.0
and Eclipse Distribution License v1.0 which accompany this distribution.
The Eclipse Public License is available at
http://www.eclipse.org/legal/epl-v10.html
and the Eclipse Distribution License is available at
http://www.eclipse.org/org/documents/edl-v10.php.
Contributors:
Ian Craggs - initial implementation and/or documentation
*******************************************************************
"""
# *******************************************************************
# Copyright (c) 2017, 2019 IBM Corp.
#
# All rights reserved. This program and the accompanying materials
# are made available under the terms of the Eclipse Public License v2.0
# and Eclipse Distribution License v1.0 which accompany this distribution.
#
# The Eclipse Public License is available at
# http://www.eclipse.org/legal/epl-v20.html
# and the Eclipse Distribution License is available at
# http://www.eclipse.org/org/documents/edl-v10.php.
#
# Contributors:
# Ian Craggs - initial implementation and/or documentation
# *******************************************************************
import struct
import sys
from .packettypes import PacketTypes
@ -52,10 +49,8 @@ def readInt32(buf):
def writeUTF(data):
# data could be a string, or bytes. If string, encode into bytes with utf-8
if sys.version_info[0] < 3:
data = bytearray(data, 'utf-8')
else:
data = data if type(data) == type(b"") else bytes(data, "utf-8")
if not isinstance(data, bytes):
data = bytes(data, "utf-8")
return writeInt16(len(data)) + data
@ -100,19 +95,17 @@ class VariableByteIntegers: # Variable Byte Integer
def encode(x):
"""
Convert an integer 0 <= x <= 268435455 into multi-byte format.
Returns the buffer convered from the integer.
Returns the buffer converted from the integer.
"""
assert 0 <= x <= 268435455
if not 0 <= x <= 268435455:
raise ValueError(f"Value {x!r} must be in range 0-268435455")
buffer = b''
while 1:
digit = x % 128
x //= 128
if x > 0:
digit |= 0x80
if sys.version_info[0] >= 3:
buffer += bytes([digit])
else:
buffer += bytes(chr(digit))
if x == 0:
break
return buffer
@ -139,14 +132,14 @@ class VariableByteIntegers: # Variable Byte Integer
return (value, bytes)
class Properties(object):
class Properties:
"""MQTT v5.0 properties class.
See Properties.names for a list of accepted property names along with their numeric values.
See Properties.properties for the data type of each property.
Example of use:
Example of use::
publish_properties = Properties(PacketTypes.PUBLISH)
publish_properties.UserProperty = ("a", "2")
@ -264,37 +257,33 @@ class Properties(object):
# the name could have spaces in, or not. Remove spaces before assignment
if name not in [aname.replace(' ', '') for aname in self.names.keys()]:
raise MQTTException(
"Property name must be one of "+str(self.names.keys()))
f"Property name must be one of {self.names.keys()}")
# check that this attribute applies to the packet type
if self.packetType not in self.properties[self.getIdentFromName(name)][1]:
raise MQTTException("Property %s does not apply to packet type %s"
% (name, PacketTypes.Names[self.packetType]))
raise MQTTException(f"Property {name} does not apply to packet type {PacketTypes.Names[self.packetType]}")
# Check for forbidden values
if type(value) != type([]):
if not isinstance(value, list):
if name in ["ReceiveMaximum", "TopicAlias"] \
and (value < 1 or value > 65535):
raise MQTTException(
"%s property value must be in the range 1-65535" % (name))
raise MQTTException(f"{name} property value must be in the range 1-65535")
elif name in ["TopicAliasMaximum"] \
and (value < 0 or value > 65535):
raise MQTTException(
"%s property value must be in the range 0-65535" % (name))
raise MQTTException(f"{name} property value must be in the range 0-65535")
elif name in ["MaximumPacketSize", "SubscriptionIdentifier"] \
and (value < 1 or value > 268435455):
raise MQTTException(
"%s property value must be in the range 1-268435455" % (name))
raise MQTTException(f"{name} property value must be in the range 1-268435455")
elif name in ["RequestResponseInformation", "RequestProblemInformation", "PayloadFormatIndicator"] \
and (value != 0 and value != 1):
raise MQTTException(
"%s property value must be 0 or 1" % (name))
f"{name} property value must be 0 or 1")
if self.allowsMultiple(name):
if type(value) != type([]):
if not isinstance(value, list):
value = [value]
if hasattr(self, name):
value = object.__getattribute__(self, name) + value
@ -308,8 +297,7 @@ class Properties(object):
if hasattr(self, compressedName):
if not first:
buffer += ", "
buffer += compressedName + " : " + \
str(getattr(self, compressedName))
buffer += f"{compressedName} : {getattr(self, compressedName)}"
first = False
buffer += "]"
return buffer
@ -345,9 +333,6 @@ class Properties(object):
buffer = b""
buffer += VariableByteIntegers.encode(identifier) # identifier
if type == self.types.index("Byte"): # value
if sys.version_info[0] < 3:
buffer += chr(value)
else:
buffer += bytes([value])
elif type == self.types.index("Two Byte Integer"):
buffer += writeInt16(value)
@ -412,8 +397,6 @@ class Properties(object):
return rc
def unpack(self, buffer):
if sys.version_info[0] < 3:
buffer = bytearray(buffer)
self.clear()
# deserialize properties into attributes from buffer received from network
propslen, VBIlen = VariableByteIntegers.decode(buffer)
@ -433,6 +416,6 @@ class Properties(object):
compressedName = propname.replace(' ', '')
if not self.allowsMultiple(compressedName) and hasattr(self, compressedName):
raise MQTTException(
"Property '%s' must not exist more than once" % property)
f"Property '{property}' must not exist more than once")
setattr(self, propname, value)
return self, propslen + VBIlen

View file

@ -5,7 +5,7 @@
# and Eclipse Distribution License v1.0 which accompany this distribution.
#
# The Eclipse Public License is available at
# http://www.eclipse.org/legal/epl-v10.html
# http://www.eclipse.org/legal/epl-v20.html
# and the Eclipse Distribution License is available at
# http://www.eclipse.org/org/documents/edl-v10.php.
#
@ -18,20 +18,58 @@ of messages in a one-shot manner. In other words, they are useful for the
situation where you have a single/multiple messages you want to publish to a
broker, then disconnect and nothing else is required.
"""
from __future__ import absolute_import
from __future__ import annotations
import collections
try:
from collections.abc import Iterable
except ImportError:
from collections import Iterable
from typing import TYPE_CHECKING, Any, List, Tuple, Union
from paho.mqtt.enums import CallbackAPIVersion
from paho.mqtt.properties import Properties
from paho.mqtt.reasoncodes import ReasonCode
from .. import mqtt
from . import client as paho
if TYPE_CHECKING:
try:
from typing import NotRequired, Required, TypedDict # type: ignore
except ImportError:
from typing_extensions import NotRequired, Required, TypedDict
def _do_publish(client):
try:
from typing import Literal
except ImportError:
from typing_extensions import Literal # type: ignore
class AuthParameter(TypedDict, total=False):
username: Required[str]
password: NotRequired[str]
class TLSParameter(TypedDict, total=False):
ca_certs: Required[str]
certfile: NotRequired[str]
keyfile: NotRequired[str]
tls_version: NotRequired[int]
ciphers: NotRequired[str]
insecure: NotRequired[bool]
class MessageDict(TypedDict, total=False):
topic: Required[str]
payload: NotRequired[paho.PayloadType]
qos: NotRequired[int]
retain: NotRequired[bool]
MessageTuple = Tuple[str, paho.PayloadType, int, bool]
MessagesList = List[Union[MessageDict, MessageTuple]]
def _do_publish(client: paho.Client):
"""Internal function"""
message = client._userdata.popleft()
@ -44,21 +82,18 @@ def _do_publish(client):
raise TypeError('message must be a dict, tuple, or list')
def _on_connect(client, userdata, flags, rc):
"""Internal callback"""
#pylint: disable=invalid-name, unused-argument
if rc == 0:
def _on_connect(client: paho.Client, userdata: MessagesList, flags, reason_code, properties):
"""Internal v5 callback"""
if reason_code == 0:
if len(userdata) > 0:
_do_publish(client)
else:
raise mqtt.MQTTException(paho.connack_string(rc))
raise mqtt.MQTTException(paho.connack_string(reason_code))
def _on_connect_v5(client, userdata, flags, rc, properties):
"""Internal v5 callback"""
_on_connect(client, userdata, flags, rc)
def _on_publish(client, userdata, mid):
def _on_publish(
client: paho.Client, userdata: collections.deque[MessagesList], mid: int, reason_codes: ReasonCode, properties: Properties,
) -> None:
"""Internal callback"""
#pylint: disable=unused-argument
@ -68,16 +103,26 @@ def _on_publish(client, userdata, mid):
_do_publish(client)
def multiple(msgs, hostname="localhost", port=1883, client_id="", keepalive=60,
will=None, auth=None, tls=None, protocol=paho.MQTTv311,
transport="tcp", proxy_args=None):
def multiple(
msgs: MessagesList,
hostname: str = "localhost",
port: int = 1883,
client_id: str = "",
keepalive: int = 60,
will: MessageDict | None = None,
auth: AuthParameter | None = None,
tls: TLSParameter | None = None,
protocol: int = paho.MQTTv311,
transport: Literal["tcp", "websockets"] = "tcp",
proxy_args: Any | None = None,
) -> None:
"""Publish multiple messages to a broker, then disconnect cleanly.
This function creates an MQTT client, connects to a broker and publishes a
list of messages. Once the messages have been delivered, it disconnects
cleanly from the broker.
msgs : a list of messages to publish. Each message is either a dict or a
:param msgs: a list of messages to publish. Each message is either a dict or a
tuple.
If a dict, only the topic must be present. Default values will be
@ -94,30 +139,30 @@ def multiple(msgs, hostname="localhost", port=1883, client_id="", keepalive=60,
If a tuple, then it must be of the form:
("<topic>", "<payload>", qos, retain)
hostname : a string containing the address of the broker to connect to.
:param str hostname: the address of the broker to connect to.
Defaults to localhost.
port : the port to connect to the broker on. Defaults to 1883.
:param int port: the port to connect to the broker on. Defaults to 1883.
client_id : the MQTT client id to use. If "" or None, the Paho library will
:param str client_id: the MQTT client id to use. If "" or None, the Paho library will
generate a client id automatically.
keepalive : the keepalive timeout value for the client. Defaults to 60
:param int keepalive: the keepalive timeout value for the client. Defaults to 60
seconds.
will : a dict containing will parameters for the client: will = {'topic':
:param will: a dict containing will parameters for the client: will = {'topic':
"<topic>", 'payload':"<payload">, 'qos':<qos>, 'retain':<retain>}.
Topic is required, all other parameters are optional and will
default to None, 0 and False respectively.
Defaults to None, which indicates no will should be used.
auth : a dict containing authentication parameters for the client:
:param auth: a dict containing authentication parameters for the client:
auth = {'username':"<username>", 'password':"<password>"}
Username is required, password is optional and will default to None
if not provided.
Defaults to None, which indicates no authentication is to be used.
tls : a dict containing TLS configuration parameters for the client:
:param tls: a dict containing TLS configuration parameters for the client:
dict = {'ca_certs':"<ca_certs>", 'certfile':"<certfile>",
'keyfile':"<keyfile>", 'tls_version':"<tls_version>",
'ciphers':"<ciphers">, 'insecure':"<bool>"}
@ -128,23 +173,28 @@ def multiple(msgs, hostname="localhost", port=1883, client_id="", keepalive=60,
processed using the tls_set_context method.
Defaults to None, which indicates that TLS should not be used.
transport : set to "tcp" to use the default setting of transport which is
:param str transport: set to "tcp" to use the default setting of transport which is
raw TCP. Set to "websockets" to use WebSockets as the transport.
proxy_args: a dictionary that will be given to the client.
:param proxy_args: a dictionary that will be given to the client.
"""
if not isinstance(msgs, Iterable):
raise TypeError('msgs must be an iterable')
if len(msgs) == 0:
raise ValueError('msgs is empty')
client = paho.Client(
CallbackAPIVersion.VERSION2,
client_id=client_id,
userdata=collections.deque(msgs),
protocol=protocol,
transport=transport,
)
client = paho.Client(client_id=client_id, userdata=collections.deque(msgs),
protocol=protocol, transport=transport)
client.enable_logger()
client.on_publish = _on_publish
if protocol == mqtt.client.MQTTv5:
client.on_connect = _on_connect_v5
else:
client.on_connect = _on_connect
client.on_connect = _on_connect # type: ignore
if proxy_args is not None:
client.proxy_set(**proxy_args)
@ -164,7 +214,8 @@ def multiple(msgs, hostname="localhost", port=1883, client_id="", keepalive=60,
if tls is not None:
if isinstance(tls, dict):
insecure = tls.pop('insecure', False)
client.tls_set(**tls)
# mypy don't get that tls no longer contains the key insecure
client.tls_set(**tls) # type: ignore[misc]
if insecure:
# Must be set *after* the `client.tls_set()` call since it sets
# up the SSL context that `client.tls_insecure_set` alters.
@ -177,49 +228,62 @@ def multiple(msgs, hostname="localhost", port=1883, client_id="", keepalive=60,
client.loop_forever()
def single(topic, payload=None, qos=0, retain=False, hostname="localhost",
port=1883, client_id="", keepalive=60, will=None, auth=None,
tls=None, protocol=paho.MQTTv311, transport="tcp", proxy_args=None):
def single(
topic: str,
payload: paho.PayloadType = None,
qos: int = 0,
retain: bool = False,
hostname: str = "localhost",
port: int = 1883,
client_id: str = "",
keepalive: int = 60,
will: MessageDict | None = None,
auth: AuthParameter | None = None,
tls: TLSParameter | None = None,
protocol: int = paho.MQTTv311,
transport: Literal["tcp", "websockets"] = "tcp",
proxy_args: Any | None = None,
) -> None:
"""Publish a single message to a broker, then disconnect cleanly.
This function creates an MQTT client, connects to a broker and publishes a
single message. Once the message has been delivered, it disconnects cleanly
from the broker.
topic : the only required argument must be the topic string to which the
:param str topic: the only required argument must be the topic string to which the
payload will be published.
payload : the payload to be published. If "" or None, a zero length payload
:param payload: the payload to be published. If "" or None, a zero length payload
will be published.
qos : the qos to use when publishing, default to 0.
:param int qos: the qos to use when publishing, default to 0.
retain : set the message to be retained (True) or not (False).
:param bool retain: set the message to be retained (True) or not (False).
hostname : a string containing the address of the broker to connect to.
:param str hostname: the address of the broker to connect to.
Defaults to localhost.
port : the port to connect to the broker on. Defaults to 1883.
:param int port: the port to connect to the broker on. Defaults to 1883.
client_id : the MQTT client id to use. If "" or None, the Paho library will
:param str client_id: the MQTT client id to use. If "" or None, the Paho library will
generate a client id automatically.
keepalive : the keepalive timeout value for the client. Defaults to 60
:param int keepalive: the keepalive timeout value for the client. Defaults to 60
seconds.
will : a dict containing will parameters for the client: will = {'topic':
:param will: a dict containing will parameters for the client: will = {'topic':
"<topic>", 'payload':"<payload">, 'qos':<qos>, 'retain':<retain>}.
Topic is required, all other parameters are optional and will
default to None, 0 and False respectively.
Defaults to None, which indicates no will should be used.
auth : a dict containing authentication parameters for the client:
auth = {'username':"<username>", 'password':"<password>"}
:param auth: a dict containing authentication parameters for the client:
Username is required, password is optional and will default to None
auth = {'username':"<username>", 'password':"<password>"}
if not provided.
Defaults to None, which indicates no authentication is to be used.
tls : a dict containing TLS configuration parameters for the client:
:param tls: a dict containing TLS configuration parameters for the client:
dict = {'ca_certs':"<ca_certs>", 'certfile':"<certfile>",
'keyfile':"<keyfile>", 'tls_version':"<tls_version>",
'ciphers':"<ciphers">, 'insecure':"<bool>"}
@ -230,12 +294,13 @@ def single(topic, payload=None, qos=0, retain=False, hostname="localhost",
Alternatively, tls input can be an SSLContext object, which will be
processed using the tls_set_context method.
transport : set to "tcp" to use the default setting of transport which is
:param transport: set to "tcp" to use the default setting of transport which is
raw TCP. Set to "websockets" to use WebSockets as the transport.
proxy_args: a dictionary that will be given to the client.
:param proxy_args: a dictionary that will be given to the client.
"""
msg = {'topic':topic, 'payload':payload, 'qos':qos, 'retain':retain}
msg: MessageDict = {'topic':topic, 'payload':payload, 'qos':qos, 'retain':retain}
multiple([msg], hostname, port, client_id, keepalive, will, auth, tls,
protocol, transport, proxy_args)

0
lib/paho/mqtt/py.typed Normal file
View file

View file

@ -1,30 +1,31 @@
"""
*******************************************************************
Copyright (c) 2017, 2019 IBM Corp.
# *******************************************************************
# Copyright (c) 2017, 2019 IBM Corp.
#
# All rights reserved. This program and the accompanying materials
# are made available under the terms of the Eclipse Public License v2.0
# and Eclipse Distribution License v1.0 which accompany this distribution.
#
# The Eclipse Public License is available at
# http://www.eclipse.org/legal/epl-v20.html
# and the Eclipse Distribution License is available at
# http://www.eclipse.org/org/documents/edl-v10.php.
#
# Contributors:
# Ian Craggs - initial implementation and/or documentation
# *******************************************************************
All rights reserved. This program and the accompanying materials
are made available under the terms of the Eclipse Public License v2.0
and Eclipse Distribution License v1.0 which accompany this distribution.
The Eclipse Public License is available at
http://www.eclipse.org/legal/epl-v10.html
and the Eclipse Distribution License is available at
http://www.eclipse.org/org/documents/edl-v10.php.
Contributors:
Ian Craggs - initial implementation and/or documentation
*******************************************************************
"""
import sys
import functools
import warnings
from typing import Any
from .packettypes import PacketTypes
class ReasonCodes:
@functools.total_ordering
class ReasonCode:
"""MQTT version 5.0 reason codes class.
See ReasonCodes.names for a list of possible numeric values along with their
See ReasonCode.names for a list of possible numeric values along with their
names and the packets to which they apply.
"""
@ -135,10 +136,12 @@ class ReasonCodes:
Used when displaying the reason code.
"""
assert identifier in self.names.keys(), identifier
if identifier not in self.names:
raise KeyError(identifier)
names = self.names[identifier]
namelist = [name for name in names.keys() if packetType in names[name]]
assert len(namelist) == 1
if len(namelist) != 1:
raise ValueError(f"Expected exactly one name, found {namelist!r}")
return namelist[0]
def getId(self, name):
@ -148,22 +151,17 @@ class ReasonCodes:
Used when setting the reason code for a packetType
check that only valid codes for the packet are set.
"""
identifier = None
for code in self.names.keys():
if name in self.names[code].keys():
if self.packetType in self.names[code][name]:
identifier = code
break
assert identifier is not None, name
return identifier
return code
raise KeyError(f"Reason code name not found: {name}")
def set(self, name):
self.value = self.getId(name)
def unpack(self, buffer):
c = buffer[0]
if sys.version_info[0] < 3:
c = ord(c)
name = self.__getName__(self.packetType, c)
self.value = self.getId(name)
return 1
@ -177,11 +175,26 @@ class ReasonCodes:
if isinstance(other, int):
return self.value == other
if isinstance(other, str):
return self.value == str(self)
if isinstance(other, ReasonCodes):
return other == str(self)
if isinstance(other, ReasonCode):
return self.value == other.value
return False
def __lt__(self, other):
if isinstance(other, int):
return self.value < other
if isinstance(other, ReasonCode):
return self.value < other.value
return NotImplemented
def __repr__(self):
try:
packet_name = PacketTypes.Names[self.packetType]
except IndexError:
packet_name = "Unknown"
return f"ReasonCode({packet_name}, {self.getName()!r})"
def __str__(self):
return self.getName()
@ -190,3 +203,21 @@ class ReasonCodes:
def pack(self):
return bytearray([self.value])
@property
def is_failure(self) -> bool:
return self.value >= 0x80
class _CompatibilityIsInstance(type):
def __instancecheck__(self, other: Any) -> bool:
return isinstance(other, ReasonCode)
class ReasonCodes(ReasonCode, metaclass=_CompatibilityIsInstance):
def __init__(self, *args, **kwargs):
warnings.warn("ReasonCodes is deprecated, use ReasonCode (singular) instead",
category=DeprecationWarning,
stacklevel=2,
)
super().__init__(*args, **kwargs)

View file

@ -5,7 +5,7 @@
# and Eclipse Distribution License v1.0 which accompany this distribution.
#
# The Eclipse Public License is available at
# http://www.eclipse.org/legal/epl-v10.html
# http://www.eclipse.org/legal/epl-v20.html
# and the Eclipse Distribution License is available at
# http://www.eclipse.org/org/documents/edl-v10.php.
#
@ -18,16 +18,15 @@ to topics and retrieving messages. The two functions are simple(), which
returns one or messages matching a set of topics, and callback() which allows
you to pass a callback for processing of messages.
"""
from __future__ import absolute_import
from .. import mqtt
from . import client as paho
def _on_connect_v5(client, userdata, flags, rc, properties):
def _on_connect(client, userdata, flags, reason_code, properties):
"""Internal callback"""
if rc != 0:
raise mqtt.MQTTException(paho.connack_string(rc))
if reason_code != 0:
raise mqtt.MQTTException(paho.connack_string(reason_code))
if isinstance(userdata['topics'], list):
for topic in userdata['topics']:
@ -35,10 +34,6 @@ def _on_connect_v5(client, userdata, flags, rc, properties):
else:
client.subscribe(userdata['topics'], userdata['qos'])
def _on_connect(client, userdata, flags, rc):
"""Internal v5 callback"""
_on_connect_v5(client, userdata, flags, rc, None)
def _on_message_callback(client, userdata, message):
"""Internal callback"""
@ -77,40 +72,41 @@ def callback(callback, topics, qos=0, userdata=None, hostname="localhost",
to a list of topics. Incoming messages are processed by the user provided
callback. This is a blocking function and will never return.
callback : function of the form "on_message(client, userdata, message)" for
:param callback: function with the same signature as `on_message` for
processing the messages received.
topics : either a string containing a single topic to subscribe to, or a
:param topics: either a string containing a single topic to subscribe to, or a
list of topics to subscribe to.
qos : the qos to use when subscribing. This is applied to all topics.
:param int qos: the qos to use when subscribing. This is applied to all topics.
userdata : passed to the callback
:param userdata: passed to the callback
hostname : a string containing the address of the broker to connect to.
:param str hostname: the address of the broker to connect to.
Defaults to localhost.
port : the port to connect to the broker on. Defaults to 1883.
:param int port: the port to connect to the broker on. Defaults to 1883.
client_id : the MQTT client id to use. If "" or None, the Paho library will
:param str client_id: the MQTT client id to use. If "" or None, the Paho library will
generate a client id automatically.
keepalive : the keepalive timeout value for the client. Defaults to 60
:param int keepalive: the keepalive timeout value for the client. Defaults to 60
seconds.
will : a dict containing will parameters for the client: will = {'topic':
:param will: a dict containing will parameters for the client: will = {'topic':
"<topic>", 'payload':"<payload">, 'qos':<qos>, 'retain':<retain>}.
Topic is required, all other parameters are optional and will
default to None, 0 and False respectively.
Defaults to None, which indicates no will should be used.
auth : a dict containing authentication parameters for the client:
:param auth: a dict containing authentication parameters for the client:
auth = {'username':"<username>", 'password':"<password>"}
Username is required, password is optional and will default to None
if not provided.
Defaults to None, which indicates no authentication is to be used.
tls : a dict containing TLS configuration parameters for the client:
:param tls: a dict containing TLS configuration parameters for the client:
dict = {'ca_certs':"<ca_certs>", 'certfile':"<certfile>",
'keyfile':"<keyfile>", 'tls_version':"<tls_version>",
'ciphers':"<ciphers">, 'insecure':"<bool>"}
@ -121,17 +117,17 @@ def callback(callback, topics, qos=0, userdata=None, hostname="localhost",
processed using the tls_set_context method.
Defaults to None, which indicates that TLS should not be used.
transport : set to "tcp" to use the default setting of transport which is
:param str transport: set to "tcp" to use the default setting of transport which is
raw TCP. Set to "websockets" to use WebSockets as the transport.
clean_session : a boolean that determines the client type. If True,
:param clean_session: a boolean that determines the client type. If True,
the broker will remove all information about this client
when it disconnects. If False, the client is a persistent
client and subscription information and queued messages
will be retained when the client disconnects.
Defaults to True.
proxy_args: a dictionary that will be given to the client.
:param proxy_args: a dictionary that will be given to the client.
"""
if qos < 0 or qos > 2:
@ -143,13 +139,17 @@ def callback(callback, topics, qos=0, userdata=None, hostname="localhost",
'qos':qos,
'userdata':userdata}
client = paho.Client(client_id=client_id, userdata=callback_userdata,
protocol=protocol, transport=transport,
clean_session=clean_session)
client = paho.Client(
paho.CallbackAPIVersion.VERSION2,
client_id=client_id,
userdata=callback_userdata,
protocol=protocol,
transport=transport,
clean_session=clean_session,
)
client.enable_logger()
client.on_message = _on_message_callback
if protocol == mqtt.client.MQTTv5:
client.on_connect = _on_connect_v5
else:
client.on_connect = _on_connect
if proxy_args is not None:
@ -193,45 +193,45 @@ def simple(topics, qos=0, msg_count=1, retained=True, hostname="localhost",
to a list of topics. Once "msg_count" messages have been received, it
disconnects cleanly from the broker and returns the messages.
topics : either a string containing a single topic to subscribe to, or a
:param topics: either a string containing a single topic to subscribe to, or a
list of topics to subscribe to.
qos : the qos to use when subscribing. This is applied to all topics.
:param int qos: the qos to use when subscribing. This is applied to all topics.
msg_count : the number of messages to retrieve from the broker.
:param int msg_count: the number of messages to retrieve from the broker.
if msg_count == 1 then a single MQTTMessage will be returned.
if msg_count > 1 then a list of MQTTMessages will be returned.
retained : If set to True, retained messages will be processed the same as
:param bool retained: If set to True, retained messages will be processed the same as
non-retained messages. If set to False, retained messages will
be ignored. This means that with retained=False and msg_count=1,
the function will return the first message received that does
not have the retained flag set.
hostname : a string containing the address of the broker to connect to.
:param str hostname: the address of the broker to connect to.
Defaults to localhost.
port : the port to connect to the broker on. Defaults to 1883.
:param int port: the port to connect to the broker on. Defaults to 1883.
client_id : the MQTT client id to use. If "" or None, the Paho library will
:param str client_id: the MQTT client id to use. If "" or None, the Paho library will
generate a client id automatically.
keepalive : the keepalive timeout value for the client. Defaults to 60
:param int keepalive: the keepalive timeout value for the client. Defaults to 60
seconds.
will : a dict containing will parameters for the client: will = {'topic':
:param will: a dict containing will parameters for the client: will = {'topic':
"<topic>", 'payload':"<payload">, 'qos':<qos>, 'retain':<retain>}.
Topic is required, all other parameters are optional and will
default to None, 0 and False respectively.
Defaults to None, which indicates no will should be used.
auth : a dict containing authentication parameters for the client:
:param auth: a dict containing authentication parameters for the client:
auth = {'username':"<username>", 'password':"<password>"}
Username is required, password is optional and will default to None
if not provided.
Defaults to None, which indicates no authentication is to be used.
tls : a dict containing TLS configuration parameters for the client:
:param tls: a dict containing TLS configuration parameters for the client:
dict = {'ca_certs':"<ca_certs>", 'certfile':"<certfile>",
'keyfile':"<keyfile>", 'tls_version':"<tls_version>",
'ciphers':"<ciphers">, 'insecure':"<bool>"}
@ -242,17 +242,20 @@ def simple(topics, qos=0, msg_count=1, retained=True, hostname="localhost",
processed using the tls_set_context method.
Defaults to None, which indicates that TLS should not be used.
transport : set to "tcp" to use the default setting of transport which is
:param protocol: the MQTT protocol version to use. Defaults to MQTTv311.
:param transport: set to "tcp" to use the default setting of transport which is
raw TCP. Set to "websockets" to use WebSockets as the transport.
clean_session : a boolean that determines the client type. If True,
:param clean_session: a boolean that determines the client type. If True,
the broker will remove all information about this client
when it disconnects. If False, the client is a persistent
client and subscription information and queued messages
will be retained when the client disconnects.
Defaults to True.
Defaults to True. If protocol is MQTTv50, clean_session
is ignored.
proxy_args: a dictionary that will be given to the client.
:param proxy_args: a dictionary that will be given to the client.
"""
if msg_count < 1:
@ -265,6 +268,10 @@ def simple(topics, qos=0, msg_count=1, retained=True, hostname="localhost",
else:
messages = []
# Ignore clean_session if protocol is MQTTv50, otherwise Client will raise
if protocol == paho.MQTTv5:
clean_session = None
userdata = {'retained':retained, 'msg_count':msg_count, 'messages':messages}
callback(_on_message_simple, topics, qos, userdata, hostname, port,

View file

@ -7,7 +7,7 @@
and Eclipse Distribution License v1.0 which accompany this distribution.
The Eclipse Public License is available at
http://www.eclipse.org/legal/epl-v10.html
http://www.eclipse.org/legal/epl-v20.html
and the Eclipse Distribution License is available at
http://www.eclipse.org/org/documents/edl-v10.php.
@ -16,14 +16,13 @@
*******************************************************************
"""
import sys
class MQTTException(Exception):
pass
class SubscribeOptions(object):
class SubscribeOptions:
"""The MQTT v5.0 subscribe options class.
The options are:
@ -42,7 +41,13 @@ class SubscribeOptions(object):
RETAIN_SEND_ON_SUBSCRIBE, RETAIN_SEND_IF_NEW_SUB, RETAIN_DO_NOT_SEND = range(
0, 3)
def __init__(self, qos=0, noLocal=False, retainAsPublished=False, retainHandling=RETAIN_SEND_ON_SUBSCRIBE):
def __init__(
self,
qos: int = 0,
noLocal: bool = False,
retainAsPublished: bool = False,
retainHandling: int = RETAIN_SEND_ON_SUBSCRIBE,
):
"""
qos: 0, 1 or 2. 0 is the default.
noLocal: True or False. False is the default and corresponds to MQTT v3.1.1 behavior.
@ -56,29 +61,27 @@ class SubscribeOptions(object):
self.noLocal = noLocal # bit 2
self.retainAsPublished = retainAsPublished # bit 3
self.retainHandling = retainHandling # bits 4 and 5: 0, 1 or 2
assert self.QoS in [0, 1, 2]
assert self.retainHandling in [
0, 1, 2], "Retain handling should be 0, 1 or 2"
if self.retainHandling not in (0, 1, 2):
raise AssertionError(f"Retain handling should be 0, 1 or 2, not {self.retainHandling}")
if self.QoS not in (0, 1, 2):
raise AssertionError(f"QoS should be 0, 1 or 2, not {self.QoS}")
def __setattr__(self, name, value):
if name not in self.names:
raise MQTTException(
name + " Attribute name must be one of "+str(self.names))
f"{name} Attribute name must be one of {self.names}")
object.__setattr__(self, name, value)
def pack(self):
assert self.QoS in [0, 1, 2]
assert self.retainHandling in [
0, 1, 2], "Retain handling should be 0, 1 or 2"
if self.retainHandling not in (0, 1, 2):
raise AssertionError(f"Retain handling should be 0, 1 or 2, not {self.retainHandling}")
if self.QoS not in (0, 1, 2):
raise AssertionError(f"QoS should be 0, 1 or 2, not {self.QoS}")
noLocal = 1 if self.noLocal else 0
retainAsPublished = 1 if self.retainAsPublished else 0
data = [(self.retainHandling << 4) | (retainAsPublished << 3) |
(noLocal << 2) | self.QoS]
if sys.version_info[0] >= 3:
buffer = bytes(data)
else:
buffer = bytearray(data)
return buffer
return bytes(data)
def unpack(self, buffer):
b0 = buffer[0]
@ -86,10 +89,10 @@ class SubscribeOptions(object):
self.retainAsPublished = True if ((b0 >> 3) & 0x01) == 1 else False
self.noLocal = True if ((b0 >> 2) & 0x01) == 1 else False
self.QoS = (b0 & 0x03)
assert self.retainHandling in [
0, 1, 2], "Retain handling should be 0, 1 or 2, not %d" % self.retainHandling
assert self.QoS in [
0, 1, 2], "QoS should be 0, 1 or 2, not %d" % self.QoS
if self.retainHandling not in (0, 1, 2):
raise AssertionError(f"Retain handling should be 0, 1 or 2, not {self.retainHandling}")
if self.QoS not in (0, 1, 2):
raise AssertionError(f"QoS should be 0, 1 or 2, not {self.QoS}")
return 1
def __repr__(self):

View file

@ -26,7 +26,7 @@ Mako==1.3.2
MarkupSafe==2.1.3
musicbrainzngs==0.7.1
packaging==24.0
paho-mqtt==1.6.1
paho-mqtt==2.0.0
platformdirs==4.2.0
plexapi==4.15.10
portend==3.2.0