diff --git a/lib/paho/mqtt/__init__.py b/lib/paho/mqtt/__init__.py index d16f17f8..0d349fc3 100644 --- a/lib/paho/mqtt/__init__.py +++ b/lib/paho/mqtt/__init__.py @@ -1,4 +1,4 @@ -__version__ = "1.5.1" +__version__ = "1.6.1" class MQTTException(Exception): diff --git a/lib/paho/mqtt/client.py b/lib/paho/mqtt/client.py index 1085970e..1c0236e4 100644 --- a/lib/paho/mqtt/client.py +++ b/lib/paho/mqtt/client.py @@ -1,7 +1,7 @@ # Copyright (c) 2012-2019 Roger Light and others # # All rights reserved. This program and the accompanying materials -# are made available under the terms of the Eclipse Public License v1.0 +# are made available under the terms of the Eclipse Public License v2.0 # and Eclipse Distribution License v1.0 which accompany this distribution. # # The Eclipse Public License is available at @@ -13,19 +13,21 @@ # Roger Light - initial API and implementation # Ian Craggs - MQTT V5 support -from .subscribeoptions import SubscribeOptions -from .reasoncodes import ReasonCodes -from .properties import Properties -from .matcher import MQTTMatcher -import logging -import hashlib -import string import base64 -import uuid -import time -import threading -import sys +import hashlib +import logging +import string import struct +import sys +import threading +import time +import uuid + +from .matcher import MQTTMatcher +from .properties import Properties +from .reasoncodes import ReasonCodes +from .subscribeoptions import SubscribeOptions + """ This is an MQTT client module. MQTT is a lightweight pub/sub messaging protocol that is easy to implement and suitable for low powered devices. @@ -51,11 +53,12 @@ except ImportError: try: # Python 3 - from urllib import request as urllib_dot_request from urllib import parse as urllib_dot_parse + from urllib import request as urllib_dot_request except ImportError: # Python 2 import urllib as urllib_dot_request + import urlparse as urllib_dot_parse @@ -78,6 +81,12 @@ if platform.system() == 'Windows': else: EAGAIN = errno.EAGAIN +# Python 2.7 does not have BlockingIOError. Fall back to IOError +try: + BlockingIOError +except NameError: + BlockingIOError = IOError + MQTTv31 = 3 MQTTv311 = 4 MQTTv5 = 5 @@ -162,6 +171,7 @@ MQTT_ERR_ACL_DENIED = 12 MQTT_ERR_UNKNOWN = 13 MQTT_ERR_ERRNO = 14 MQTT_ERR_QUEUE_SIZE = 15 +MQTT_ERR_KEEPALIVE = 16 MQTT_CLIENT = 0 MQTT_BRIDGE = 1 @@ -176,10 +186,6 @@ class WebsocketConnectionError(ValueError): pass -class WouldBlockError(Exception): - pass - - def error_string(mqtt_errno): """Return the error string associated with an mqtt error number.""" if mqtt_errno == MQTT_ERR_SUCCESS: @@ -214,6 +220,8 @@ def error_string(mqtt_errno): return "Error defined by errno." elif mqtt_errno == MQTT_ERR_QUEUE_SIZE: return "Message queue full." + elif mqtt_errno == MQTT_ERR_KEEPALIVE: + return "Client or broker did not communicate in the keepalive interval." else: return "Unknown error." @@ -278,9 +286,8 @@ def _socketpair_compat(): sock1.setblocking(0) try: sock1.connect(("127.0.0.1", port)) - except socket.error as err: - if err.errno != errno.EINPROGRESS and err.errno != errno.EWOULDBLOCK and err.errno != EAGAIN: - raise + except BlockingIOError: + pass sock2, address = listensock.accept() sock2.setblocking(0) listensock.close() @@ -335,19 +342,44 @@ class MQTTMessageInfo(object): self._published = True self._condition.notify() - def wait_for_publish(self): - """Block until the message associated with this object is published.""" + def wait_for_publish(self, timeout=None): + """Block until the message associated with this object is published, or + until the timeout occurs. If timeout is None, this will never time out. + Set timeout to a positive number of seconds, e.g. 1.2, to enable the + timeout. + + Raises ValueError if the message was not queued due to the outgoing + queue being full. + + Raises RuntimeError if the message was not published for another + reason. + """ if self.rc == MQTT_ERR_QUEUE_SIZE: raise ValueError('Message is not queued due to ERR_QUEUE_SIZE') + elif self.rc == MQTT_ERR_AGAIN: + pass + elif self.rc > 0: + raise RuntimeError('Message publish failed: %s' % (error_string(self.rc))) + + timeout_time = None if timeout is None else time.time() + timeout + timeout_tenth = None if timeout is None else timeout / 10. + def timed_out(): + return False if timeout is None else time.time() > timeout_time + with self._condition: - while not self._published: - self._condition.wait() + while not self._published and not timed_out(): + self._condition.wait(timeout_tenth) def is_published(self): """Returns True if the message associated with this object has been published, else returns False.""" if self.rc == MQTT_ERR_QUEUE_SIZE: raise ValueError('Message is not queued due to ERR_QUEUE_SIZE') + elif self.rc == MQTT_ERR_AGAIN: + pass + elif self.rc > 0: + raise RuntimeError('Message publish failed: %s' % (error_string(self.rc))) + with self._condition: return self._published @@ -358,14 +390,12 @@ class MQTTMessage(object): Members: - topic : String/bytes. topic that the message was published on. - payload : String/bytes the message payload. + topic : String. topic that the message was published on. + payload : Bytes/Byte array. the message payload. qos : Integer. The message Quality of Service 0, 1 or 2. retain : Boolean. If true, the message is a retained message and not fresh. mid : Integer. The message id. properties: Properties class. In MQTT v5.0, the properties associated with the message. - - On Python 3, topic must be bytes. """ __slots__ = 'timestamp', 'state', 'dup', 'mid', '_topic', 'payload', 'qos', 'retain', 'info', 'properties' @@ -426,11 +456,24 @@ class Client(object): broker. To use a callback, define a function and then assign it to the client: - def on_connect(client, userdata, flags, rc, properties=None): + def on_connect(client, userdata, flags, rc): print("Connection returned " + str(rc)) client.on_connect = on_connect + Callbacks can also be attached using decorators: + + client = paho.mqtt.Client() + + @client.connect_callback() + def on_connect(client, userdata, flags, rc): + print("Connection returned " + str(rc)) + + + **IMPORTANT** the required function signature for a callback can differ + depending on whether you are using MQTT v5 or MQTT v3.1.1/v3.1. See the + documentation for each callback. + All of the callbacks as described below have a "client" and an "userdata" argument. "client" is the Client instance that is calling the callback. "userdata" is user data of any type and can be set when creating a new client @@ -439,81 +482,16 @@ class Client(object): If you wish to suppress exceptions within a callback, you should set `client.suppress_exceptions = True` - The callbacks: + The callbacks are listed below, documentation for each of them can be found + at the same function name: - on_connect(client, userdata, flags, rc, properties=None): called when the broker responds to our connection - request. - flags is a dict that contains response flags from the broker: - flags['session present'] - this flag is useful for clients that are - using clean session set to 0 only. If a client with clean - session=0, that reconnects to a broker that it has previously - connected to, this flag indicates whether the broker still has the - session information for the client. If 1, the session still exists. - The value of rc determines success or not: - 0: Connection successful - 1: Connection refused - incorrect protocol version - 2: Connection refused - invalid client identifier - 3: Connection refused - server unavailable - 4: Connection refused - bad username or password - 5: Connection refused - not authorised - 6-255: Currently unused. - - on_disconnect(client, userdata, rc): called when the client disconnects from the broker. - The rc parameter indicates the disconnection state. If MQTT_ERR_SUCCESS - (0), the callback was called in response to a disconnect() call. If any - other value the disconnection was unexpected, such as might be caused by - a network error. - - on_disconnect(client, userdata, rc, properties): called when the MQTT V5 client disconnects from the broker. - When using MQTT V5, the broker can send a disconnect message to the client. The - message can contain a reason code and MQTT V5 properties. The properties parameter could be - None if they do not exist in the disconnect message. - - on_message(client, userdata, message): called when a message has been received on a - topic that the client subscribes to. The message variable is a - MQTTMessage that describes all of the message parameters. - - on_publish(client, userdata, mid): called when a message that was to be sent using the - publish() call has completed transmission to the broker. For messages - with QoS levels 1 and 2, this means that the appropriate handshakes have - completed. For QoS 0, this simply means that the message has left the - client. The mid variable matches the mid variable returned from the - corresponding publish() call, to allow outgoing messages to be tracked. - This callback is important because even if the publish() call returns - success, it does not always mean that the message has been sent. - - on_subscribe(client, userdata, mid, granted_qos, properties=None): called when the broker responds to a - subscribe request. The mid variable matches the mid variable returned - from the corresponding subscribe() call. The granted_qos variable is a - list of integers that give the QoS level the broker has granted for each - of the different subscription requests. - - on_unsubscribe(client, userdata, mid): called when the broker responds to an unsubscribe - request. The mid variable matches the mid variable returned from the - corresponding unsubscribe() call. - - on_log(client, userdata, level, buf): called when the client has log information. Define - to allow debugging. The level variable gives the severity of the message - and will be one of MQTT_LOG_INFO, MQTT_LOG_NOTICE, MQTT_LOG_WARNING, - MQTT_LOG_ERR, and MQTT_LOG_DEBUG. The message itself is in buf. - - on_socket_open(client, userdata, sock): Called when the socket has been opened. Use this - to register the socket with an external event loop for reading. - - on_socket_close(client, userdata, sock): Called when the socket is about to be closed. - Use this to unregister a socket from an external event loop for reading. - - on_socket_register_write(client, userdata, sock): Called when a write operation to the - socket failed because it would have blocked, e.g. output buffer full. Use this to - register the socket with an external event loop for writing. - - on_socket_unregister_write(client, userdata, sock): Called when a write operation to the - socket succeeded after it had previously failed. Use this to unregister the socket - from an external event loop for writing. + on_connect, on_connect_fail, on_disconnect, on_message, on_publish, + on_subscribe, on_unsubscribe, on_log, on_socket_open, on_socket_close, + on_socket_register_write, on_socket_unregister_write """ def __init__(self, client_id="", clean_session=None, userdata=None, - protocol=MQTTv311, transport="tcp"): + protocol=MQTTv311, transport="tcp", reconnect_on_failure=True): """client_id is the unique client id string used when connecting to the broker. If client_id is zero length or None, then the behaviour is defined by which protocol version is in use. If using MQTT v3.1.1, then @@ -540,24 +518,13 @@ class Client(object): The protocol argument allows explicit setting of the MQTT version to use for this client. Can be paho.mqtt.client.MQTTv311 (v3.1.1), - paho.mqtt.client.MQTTv31 (v3.1) or paho.mqtt.client.MQTTv5 (v5.0), + paho.mqtt.client.MQTTv31 (v3.1) or paho.mqtt.client.MQTTv5 (v5.0), with the default being v3.1.1. Set transport to "websockets" to use WebSockets as the transport mechanism. Set to "tcp" to use raw TCP, which is the default. """ - if protocol == MQTTv5: - if clean_session != None: - raise ValueError('Clean session is not used for MQTT 5.0') - else: - if clean_session == None: - clean_session = True - if not clean_session and (client_id == "" or client_id is None): - raise ValueError( - 'A client id must be provided if clean session is False.') - self._clean_session = clean_session - if transport.lower() not in ('websockets', 'tcp'): raise ValueError( 'transport must be "websockets" or "tcp", not %s' % transport) @@ -566,11 +533,21 @@ class Client(object): self._userdata = userdata self._sock = None self._sockpairR, self._sockpairW = (None, None,) - self._sockpairR, self._sockpairW = _socketpair_compat() self._keepalive = 60 - self._message_retry = 20 - self._last_retry_check = 0 + self._connect_timeout = 5.0 self._client_mode = MQTT_CLIENT + + if protocol == MQTTv5: + if clean_session is not None: + raise ValueError('Clean session is not used for MQTT 5.0') + else: + if clean_session is None: + clean_session = True + if not clean_session and (client_id == "" or client_id is None): + raise ValueError( + 'A client id must be provided if clean session is False.') + self._clean_session = clean_session + # [MQTT-3.1.3-4] Client Id must be UTF-8 encoded string. if client_id == "" or client_id is None: if protocol == MQTTv31: @@ -590,16 +567,16 @@ class Client(object): "remaining_count": [], "remaining_mult": 1, "remaining_length": 0, - "packet": b"", + "packet": bytearray(b""), "to_process": 0, "pos": 0} self._out_packet = collections.deque() - self._current_out_packet = None self._last_msg_in = time_func() self._last_msg_out = time_func() self._reconnect_min_delay = 1 self._reconnect_max_delay = 120 self._reconnect_delay = None + self._reconnect_on_failure = reconnect_on_failure self._ping_t = 0 self._last_mid = 0 self._state = mqtt_cs_new @@ -623,8 +600,6 @@ class Client(object): self._proxy = {} self._in_callback_mutex = threading.Lock() self._callback_mutex = threading.RLock() - self._out_packet_mutex = threading.Lock() - self._current_out_packet_mutex = threading.RLock() self._msgtime_mutex = threading.Lock() self._out_message_mutex = threading.RLock() self._in_message_mutex = threading.Lock() @@ -641,6 +616,7 @@ class Client(object): # No default callbacks self._on_log = None self._on_connect = None + self._on_connect_fail = None self._on_subscribe = None self._on_message = None self._on_publish = None @@ -662,29 +638,23 @@ class Client(object): def _sock_recv(self, bufsize): try: return self._sock.recv(bufsize) - except socket.error as err: - if self._ssl and err.errno == ssl.SSL_ERROR_WANT_READ: - raise WouldBlockError() - if self._ssl and err.errno == ssl.SSL_ERROR_WANT_WRITE: - self._call_socket_register_write() - raise WouldBlockError() - if err.errno == EAGAIN: - raise WouldBlockError() - raise + except ssl.SSLWantReadError: + raise BlockingIOError + except ssl.SSLWantWriteError: + self._call_socket_register_write() + raise BlockingIOError def _sock_send(self, buf): try: return self._sock.send(buf) - except socket.error as err: - if self._ssl and err.errno == ssl.SSL_ERROR_WANT_READ: - raise WouldBlockError() - if self._ssl and err.errno == ssl.SSL_ERROR_WANT_WRITE: - self._call_socket_register_write() - raise WouldBlockError() - if err.errno == EAGAIN: - self._call_socket_register_write() - raise WouldBlockError() - raise + except ssl.SSLWantReadError: + raise BlockingIOError + except ssl.SSLWantWriteError: + self._call_socket_register_write() + raise BlockingIOError + except BlockingIOError: + self._call_socket_register_write() + raise BlockingIOError def _sock_close(self): """Close the connection to the server.""" @@ -700,8 +670,9 @@ class Client(object): # In case a callback fails, still close the socket to avoid leaking the file descriptor. sock.close() - def _reset_sockets(self): - self._sock_close() + def _reset_sockets(self, sockpair_only=False): + if sockpair_only == False: + self._sock_close() if self._sockpairR: self._sockpairR.close() @@ -761,7 +732,7 @@ class Client(object): if hasattr(context, 'check_hostname'): self._tls_insecure = not context.check_hostname - def tls_set(self, ca_certs=None, certfile=None, keyfile=None, cert_reqs=None, tls_version=None, ciphers=None): + def tls_set(self, ca_certs=None, certfile=None, keyfile=None, cert_reqs=None, tls_version=None, ciphers=None, keyfile_password=None): """Configure network encryption and authentication options. Enables SSL/TLS support. ca_certs : a string path to the Certificate Authority certificate files @@ -769,7 +740,7 @@ class Client(object): option given then the client will operate in a similar manner to a web browser. That is to say it will require the broker to have a certificate signed by the Certificate Authorities in ca_certs and will - communicate using TLS v1, but will not attempt any form of + communicate using TLS v1,2, but will not attempt any form of authentication. This provides basic network encryption but may not be sufficient depending on how the broker is configured. By default, on Python 2.7.9+ or 3.4+, the default certification @@ -781,8 +752,11 @@ class Client(object): None then they will be used as client information for TLS based authentication. Support for this feature is broker dependent. Note that if either of these files in encrypted and needs a password to - decrypt it, Python will ask for the password at the command line. It is - not currently possible to define a callback to provide the password. + decrypt it, then this can be passed using the keyfile_password + argument - you should take precautions to ensure that your password is + not hard coded into your program by loading the password from a file + for example. If you do not provide keyfile_password, the password will + be requested to be typed in at a terminal window. cert_reqs allows the certificate requirements that the client imposes on the broker to be changed. By default this is ssl.CERT_REQUIRED, @@ -790,9 +764,8 @@ class Client(object): pydoc for more information on this parameter. tls_version allows the version of the SSL/TLS protocol used to be - specified. By default TLS v1 is used. Previous versions (all versions - beginning with SSL) are possible but not recommended due to possible - security problems. + specified. By default TLS v1.2 is used. Previous versions are allowed + but not recommended due to possible security problems. ciphers is a string specifying which encryption ciphers are allowable for this connection, or None to use the defaults. See the ssl pydoc for @@ -812,7 +785,7 @@ class Client(object): # Create SSLContext object if tls_version is None: - tls_version = ssl.PROTOCOL_TLSv1 + tls_version = ssl.PROTOCOL_TLSv1_2 # If the python version supports it, use highest TLS version automatically if hasattr(ssl, "PROTOCOL_TLS"): tls_version = ssl.PROTOCOL_TLS @@ -820,7 +793,7 @@ class Client(object): # Configure context if certfile is not None: - context.load_cert_chain(certfile, keyfile) + context.load_cert_chain(certfile, keyfile, keyfile_password) if cert_reqs == ssl.CERT_NONE and hasattr(context, 'check_hostname'): context.check_hostname = False @@ -920,7 +893,7 @@ class Client(object): keepalive: Maximum period in seconds between communications with the broker. If no other messages are being exchanged, this controls the rate at which the client will send ping messages to the broker. - clean_start: (MQTT v5.0 only) True, False or MQTT_CLEAN_START_FIRST_ONLY. + clean_start: (MQTT v5.0 only) True, False or MQTT_CLEAN_START_FIRST_ONLY. Sets the MQTT v5.0 clean_start flag always, never or on the first successful connect only, respectively. MQTT session data (such as outstanding messages and subscriptions) is cleared on successful connect when the clean_start flag is set. @@ -994,7 +967,7 @@ class Client(object): keepalive: Maximum period in seconds between communications with the broker. If no other messages are being exchanged, this controls the rate at which the client will send ping messages to the broker. - clean_start: (MQTT v5.0 only) True, False or MQTT_CLEAN_START_FIRST_ONLY. + clean_start: (MQTT v5.0 only) True, False or MQTT_CLEAN_START_FIRST_ONLY. Sets the MQTT v5.0 clean_start flag always, never or on the first successful connect only, respectively. MQTT session data (such as outstanding messages and subscriptions) is cleared on successful connect when the clean_start flag is set. @@ -1021,7 +994,7 @@ class Client(object): self._clean_start = clean_start self._connect_properties = properties self._state = mqtt_cs_connect_async - + def reconnect_delay_set(self, min_delay=1, max_delay=120): """ Configure the exponential reconnect delay @@ -1050,15 +1023,11 @@ class Client(object): "remaining_count": [], "remaining_mult": 1, "remaining_length": 0, - "packet": b"", + "packet": bytearray(b""), "to_process": 0, "pos": 0} - with self._out_packet_mutex: - self._out_packet = collections.deque() - - with self._current_out_packet_mutex: - self._current_out_packet = None + self._out_packet = collections.deque() with self._msgtime_mutex: self._last_msg_in = time_func() @@ -1121,6 +1090,11 @@ class Client(object): def loop(self, timeout=1.0, max_packets=1): """Process network events. + It is strongly recommended that you use loop_start(), or + loop_forever(), or if you are using an external event loop using + loop_read(), loop_write(), and loop_misc(). Using loop() on it's own is + no longer recommended. + This function must be called regularly to ensure communication with the broker is carried out. It calls select() on the network socket to wait for network events. If incoming data is present it will then be @@ -1138,18 +1112,23 @@ class Client(object): Returns >0 on error. A ValueError will be raised if timeout < 0""" + + if self._sockpairR is None or self._sockpairW is None: + self._reset_sockets(sockpair_only=True) + self._sockpairR, self._sockpairW = _socketpair_compat() + + return self._loop(timeout) + + def _loop(self, timeout=1.0): if timeout < 0.0: raise ValueError('Invalid timeout.') - with self._current_out_packet_mutex: - with self._out_packet_mutex: - if self._current_out_packet is None and len(self._out_packet) > 0: - self._current_out_packet = self._out_packet.popleft() - - if self._current_out_packet: - wlist = [self._sock] - else: - wlist = [] + try: + packet = self._out_packet.popleft() + self._out_packet.appendleft(packet) + wlist = [self._sock] + except IndexError: + wlist = [] # used to check if there are any bytes left in the (SSL) socket pending_bytes = 0 @@ -1162,7 +1141,11 @@ class Client(object): # sockpairR is used to break out of select() before the timeout, on a # call to publish() etc. - rlist = [self._sock, self._sockpairR] + if self._sockpairR is None: + rlist = [self._sock] + else: + rlist = [self._sock, self._sockpairR] + try: socklist = select.select(rlist, wlist, [], timeout) except TypeError: @@ -1178,23 +1161,24 @@ class Client(object): return MQTT_ERR_UNKNOWN if self._sock in socklist[0] or pending_bytes > 0: - rc = self.loop_read(max_packets) + rc = self.loop_read() if rc or self._sock is None: return rc - if self._sockpairR in socklist[0]: + if self._sockpairR and self._sockpairR in socklist[0]: # Stimulate output write even though we didn't ask for it, because # at that point the publish or other command wasn't present. socklist[1].insert(0, self._sock) # Clear sockpairR - only ever a single byte written. try: - self._sockpairR.recv(1) - except socket.error as err: - if err.errno != EAGAIN: - raise + # Read many bytes at once - this allows up to 10000 calls to + # publish() inbetween calls to loop(). + self._sockpairR.recv(10000) + except BlockingIOError: + pass if self._sock in socklist[1]: - rc = self.loop_write(max_packets) + rc = self.loop_write() if rc or self._sock is None: return rc @@ -1234,7 +1218,7 @@ class Client(object): is defined. A ValueError will be raised if topic is None, has zero length or is - invalid (contains a wildcard), except if the MQTT version used is v5.0. + invalid (contains a wildcard), except if the MQTT version used is v5.0. For v5.0, a zero length topic can be used when a Topic Alias has been set. A ValueError will be raised if qos is not one of 0, 1 or 2, or if @@ -1321,10 +1305,10 @@ class Client(object): Must be called before connect() to have any effect. Requires a broker that supports MQTT v3.1. - username: The username to authenticate with. Need have no relationship to the client id. Must be unicode + username: The username to authenticate with. Need have no relationship to the client id. Must be unicode [MQTT-3.1.3-11]. Set to None to reset client back to not using username/password for broker authentication. - password: The password to authenticate with. Optional, set to None if not required. If it is unicode, then it + password: The password to authenticate with. Optional, set to None if not required. If it is unicode, then it will be encoded as UTF-8. """ @@ -1361,7 +1345,7 @@ class Client(object): def disconnect(self, reasoncode=None, properties=None): """Disconnect a connected client from the broker. reasoncode: (MQTT v5.0 only) a ReasonCodes instance setting the MQTT v5.0 - reasoncode to be sent with the disconnect. It is optional, the receiver + reasoncode to be sent with the disconnect. It is optional, the receiver then assuming that 0 (success) is the value. properties: (MQTT v5.0 only) a Properties instance setting the MQTT v5.0 properties to be included. Optional - if not set, no properties are sent. @@ -1378,7 +1362,7 @@ class Client(object): This function may be called in three different ways (and a further three for MQTT v5.0): - Simple string and integer + Simple string and integer ------------------------- e.g. subscribe("my/topic", 2) @@ -1468,7 +1452,7 @@ class Client(object): if qos < 0 or qos > 2: raise ValueError('Invalid QoS level.') if self._protocol == MQTTv5: - if options == None: + if options is None: # if no options are provided, use the QoS passed instead options = SubscribeOptions(qos=qos) elif qos != 0: @@ -1589,18 +1573,14 @@ class Client(object): if self._sock is None: return MQTT_ERR_NO_CONN - max_packets = len(self._out_packet) + 1 - if max_packets < 1: - max_packets = 1 - try: - for _ in range(0, max_packets): - rc = self._packet_write() - if rc > 0: - return self._loop_rc_handle(rc) - elif rc == MQTT_ERR_AGAIN: - return MQTT_ERR_SUCCESS - return MQTT_ERR_SUCCESS + rc = self._packet_write() + if rc == MQTT_ERR_AGAIN: + return MQTT_ERR_SUCCESS + elif rc > 0: + return self._loop_rc_handle(rc) + else: + return MQTT_ERR_SUCCESS finally: if self.want_write(): self._call_socket_register_write() @@ -1611,9 +1591,11 @@ class Client(object): """Call to determine if there is network data waiting to be written. Useful if you are calling select() yourself rather than using loop(). """ - if self._current_out_packet or len(self._out_packet) > 0: + try: + packet = self._out_packet.popleft() + self._out_packet.appendleft(packet) return True - else: + except IndexError: return False def loop_misc(self): @@ -1626,10 +1608,6 @@ class Client(object): now = time_func() self._check_keepalive() - if self._last_retry_check + 1 < now: - # Only check once a second at most - self._message_retry_check() - self._last_retry_check = now if self._ping_t > 0 and now - self._ping_t >= self._keepalive: # client->ping_t != 0 means we are waiting for a pingresp. @@ -1639,7 +1617,7 @@ class Client(object): if self._state == mqtt_cs_disconnecting: rc = MQTT_ERR_SUCCESS else: - rc = 1 + rc = MQTT_ERR_KEEPALIVE self._do_on_disconnect(rc) @@ -1665,12 +1643,8 @@ class Client(object): return self def message_retry_set(self, retry): - """Set the timeout in seconds before a message with QoS>0 is retried. - 20 seconds by default.""" - if retry < 0: - raise ValueError('Invalid retry.') - - self._message_retry = retry + """No longer used, remove in version 2.0""" + pass def user_data_set(self, userdata): """Set the user data variable passed to callbacks. May be any data type.""" @@ -1739,20 +1713,22 @@ class Client(object): return self._sock def loop_forever(self, timeout=1.0, max_packets=1, retry_first_connection=False): - """This function call loop() for you in an infinite blocking loop. It - is useful for the case where you only want to run the MQTT client loop - in your program. + """This function calls the network loop functions for you in an + infinite blocking loop. It is useful for the case where you only want + to run the MQTT client loop in your program. - loop_forever() will handle reconnecting for you. If you call - disconnect() in a callback it will return. + loop_forever() will handle reconnecting for you if reconnect_on_failure is + true (this is the default behavior). If you call disconnect() in a callback + it will return. timeout: The time in seconds to wait for incoming/outgoing network traffic before timing out and returning. max_packets: Not currently used. retry_first_connection: Should the first connection attempt be retried on failure. + This is independent of the reconnect_on_failure setting. - Raises socket.error on first connection failures unless retry_first_connection=True + Raises OSError/WebsocketConnectionError on first connection failures unless retry_first_connection=True """ run = True @@ -1764,7 +1740,8 @@ class Client(object): if self._state == mqtt_cs_connect_async: try: self.reconnect() - except (socket.error, OSError, WebsocketConnectionError): + except (OSError, WebsocketConnectionError): + self._handle_on_connect_fail() if not retry_first_connection: raise self._easy_log( @@ -1776,14 +1753,12 @@ class Client(object): while run: rc = MQTT_ERR_SUCCESS while rc == MQTT_ERR_SUCCESS: - rc = self.loop(timeout, max_packets) + rc = self._loop(timeout) # We don't need to worry about locking here, because we've # either called loop_forever() when in single threaded mode, or # in multi threaded mode when loop_stop() has been called and - # so no other threads can access _current_out_packet, - # _out_packet or _messages. + # so no other threads can access _out_packet or _messages. if (self._thread_terminate is True - and self._current_out_packet is None and len(self._out_packet) == 0 and len(self._out_messages) == 0): rc = 1 @@ -1792,7 +1767,7 @@ class Client(object): def should_exit(): return self._state == mqtt_cs_disconnecting or run is False or self._thread_terminate is True - if should_exit(): + if should_exit() or not self._reconnect_on_failure: run = False else: self._reconnect_wait() @@ -1802,7 +1777,8 @@ class Client(object): else: try: self.reconnect() - except (socket.error, OSError, WebsocketConnectionError): + except (OSError, WebsocketConnectionError): + self._handle_on_connect_fail() self._easy_log( MQTT_LOG_DEBUG, "Connection failed, retrying") @@ -1816,6 +1792,7 @@ class Client(object): if self._thread is not None: return MQTT_ERR_INVAL + self._sockpairR, self._sockpairW = _socketpair_compat() self._thread_terminate = False self._thread = threading.Thread(target=self._thread_main) self._thread.daemon = True @@ -1855,9 +1832,18 @@ class Client(object): MQTT_LOG_INFO, MQTT_LOG_NOTICE, MQTT_LOG_WARNING, MQTT_LOG_ERR, and MQTT_LOG_DEBUG. buf: the message itself + + Decorator: @client.log_callback() (```client``` is the name of the + instance which this callback is being attached to) """ self._on_log = func + def log_callback(self): + def decorator(func): + self.on_log = func + return func + return decorator + @property def on_connect(self): """If implemented, called when the broker responds to our connection @@ -1869,7 +1855,7 @@ class Client(object): """ Define the connect callback implementation. Expected signature for MQTT v3.1 and v3.1.1 is: - connect_callback(client, userdata, flags, rc, properties=None) + connect_callback(client, userdata, flags, rc) and for MQTT v5.0: connect_callback(client, userdata, flags, reasonCode, properties) @@ -1879,11 +1865,11 @@ class Client(object): flags: response flags sent by the broker rc: the connection result reasonCode: the MQTT v5.0 reason code: an instance of the ReasonCode class. - ReasonCode may be compared to interger. + ReasonCode may be compared to integer. properties: the MQTT v5.0 properties returned from the broker. An instance of the Properties class. For MQTT v3.1 and v3.1.1 properties is not provided but for compatibility - with MQTT v5.0, we recommand adding properties=None. + with MQTT v5.0, we recommend adding properties=None. flags is a dict that contains response flags from the broker: flags['session present'] - this flag is useful for clients that are @@ -1900,10 +1886,49 @@ class Client(object): 4: Connection refused - bad username or password 5: Connection refused - not authorised 6-255: Currently unused. + + Decorator: @client.connect_callback() (```client``` is the name of the + instance which this callback is being attached to) + """ with self._callback_mutex: self._on_connect = func + def connect_callback(self): + def decorator(func): + self.on_connect = func + return func + return decorator + + @property + def on_connect_fail(self): + """If implemented, called when the client failed to connect + to the broker.""" + return self._on_connect_fail + + @on_connect_fail.setter + def on_connect_fail(self, func): + """ Define the connection failure callback implementation + + Expected signature is: + on_connect_fail(client, userdata) + + client: the client instance for this callback + userdata: the private user data as set in Client() or userdata_set() + + Decorator: @client.connect_fail_callback() (```client``` is the name of the + instance which this callback is being attached to) + + """ + with self._callback_mutex: + self._on_connect_fail = func + + def connect_fail_callback(self): + def decorator(func): + self.on_connect_fail = func + return func + return decorator + @property def on_subscribe(self): """If implemented, called when the broker responds to a subscribe @@ -1912,10 +1937,10 @@ class Client(object): @on_subscribe.setter def on_subscribe(self, func): - """ Define the suscribe callback implementation. + """ Define the subscribe callback implementation. Expected signature for MQTT v3.1.1 and v3.1 is: - subscribe_callback(client, userdata, mid, granted_qos, properties=None) + subscribe_callback(client, userdata, mid, granted_qos) and for MQTT v5.0: subscribe_callback(client, userdata, mid, reasonCodes, properties) @@ -1930,10 +1955,19 @@ class Client(object): subscription. A list of ReasonCodes instances. properties: the MQTT v5.0 properties received from the broker. A list of Properties class instances. + + Decorator: @client.subscribe_callback() (```client``` is the name of the + instance which this callback is being attached to) """ with self._callback_mutex: self._on_subscribe = func + def subscribe_callback(self): + def decorator(func): + self.on_subscribe = func + return func + return decorator + @property def on_message(self): """If implemented, called when a message has been received on a topic @@ -1955,10 +1989,20 @@ class Client(object): userdata: the private user data as set in Client() or userdata_set() message: an instance of MQTTMessage. This is a class with members topic, payload, qos, retain. + + Decorator: @client.message_callback() (```client``` is the name of the + instance which this callback is being attached to) + """ with self._callback_mutex: self._on_message = func + def message_callback(self): + def decorator(func): + self.on_message = func + return func + return decorator + @property def on_publish(self): """If implemented, called when a message that was to be sent using the @@ -1982,10 +2026,20 @@ class Client(object): userdata: the private user data as set in Client() or userdata_set() mid: matches the mid variable returned from the corresponding publish() call, to allow outgoing messages to be tracked. + + Decorator: @client.publish_callback() (```client``` is the name of the + instance which this callback is being attached to) + """ with self._callback_mutex: self._on_publish = func + def publish_callback(self): + def decorator(func): + self.on_publish = func + return func + return decorator + @property def on_unsubscribe(self): """If implemented, called when the broker responds to an unsubscribe @@ -2010,10 +2064,19 @@ class Client(object): list of Properties class instances. reasonCodes: the MQTT v5.0 reason codes received from the broker for each unsubscribe topic. A list of ReasonCodes instances + + Decorator: @client.unsubscribe_callback() (```client``` is the name of the + instance which this callback is being attached to) """ with self._callback_mutex: self._on_unsubscribe = func + def unsubscribe_callback(self): + def decorator(func): + self.on_unsubscribe = func + return func + return decorator + @property def on_disconnect(self): """If implemented, called when the client disconnects from the broker. @@ -2037,10 +2100,20 @@ class Client(object): MQTT_ERR_SUCCESS (0), the callback was called in response to a disconnect() call. If any other value the disconnection was unexpected, such as might be caused by a network error. + + Decorator: @client.disconnect_callback() (```client``` is the name of the + instance which this callback is being attached to) + """ with self._callback_mutex: self._on_disconnect = func + def disconnect_callback(self): + def decorator(func): + self.on_disconnect = func + return func + return decorator + @property def on_socket_open(self): """If implemented, called just after the socket was opend.""" @@ -2058,22 +2131,33 @@ class Client(object): client: the client instance for this callback userdata: the private user data as set in Client() or userdata_set() sock: the socket which was just opened. + + Decorator: @client.socket_open_callback() (```client``` is the name of the + instance which this callback is being attached to) """ with self._callback_mutex: self._on_socket_open = func + def socket_open_callback(self): + def decorator(func): + self.on_socket_open = func + return func + return decorator + def _call_socket_open(self): """Call the socket_open callback with the just-opened socket""" with self._callback_mutex: - if self.on_socket_open: - with self._in_callback_mutex: - try: - self.on_socket_open(self, self._userdata, self._sock) - except Exception as err: - self._easy_log( - MQTT_LOG_ERR, 'Caught exception in on_socket_open: %s', err) - if not self.suppress_exceptions: - raise + on_socket_open = self.on_socket_open + + if on_socket_open: + with self._in_callback_mutex: + try: + on_socket_open(self, self._userdata, self._sock) + except Exception as err: + self._easy_log( + MQTT_LOG_ERR, 'Caught exception in on_socket_open: %s', err) + if not self.suppress_exceptions: + raise @property def on_socket_close(self): @@ -2092,22 +2176,33 @@ class Client(object): client: the client instance for this callback userdata: the private user data as set in Client() or userdata_set() sock: the socket which is about to be closed. + + Decorator: @client.socket_close_callback() (```client``` is the name of the + instance which this callback is being attached to) """ with self._callback_mutex: self._on_socket_close = func + def socket_close_callback(self): + def decorator(func): + self.on_socket_close = func + return func + return decorator + def _call_socket_close(self, sock): """Call the socket_close callback with the about-to-be-closed socket""" with self._callback_mutex: - if self.on_socket_close: - with self._in_callback_mutex: - try: - self.on_socket_close(self, self._userdata, sock) - except Exception as err: - self._easy_log( - MQTT_LOG_ERR, 'Caught exception in on_socket_close: %s', err) - if not self.suppress_exceptions: - raise + on_socket_close = self.on_socket_close + + if on_socket_close: + with self._in_callback_mutex: + try: + on_socket_close(self, self._userdata, sock) + except Exception as err: + self._easy_log( + MQTT_LOG_ERR, 'Caught exception in on_socket_close: %s', err) + if not self.suppress_exceptions: + raise @property def on_socket_register_write(self): @@ -2126,25 +2221,36 @@ class Client(object): client: the client instance for this callback userdata: the private user data as set in Client() or userdata_set() sock: the socket which should be registered for writing + + Decorator: @client.socket_register_write_callback() (```client``` is the name of the + instance which this callback is being attached to) """ with self._callback_mutex: self._on_socket_register_write = func + def socket_register_write_callback(self): + def decorator(func): + self._on_socket_register_write = func + return func + return decorator + def _call_socket_register_write(self): """Call the socket_register_write callback with the unwritable socket""" if not self._sock or self._registered_write: return self._registered_write = True with self._callback_mutex: - if self.on_socket_register_write: - try: - self.on_socket_register_write( - self, self._userdata, self._sock) - except Exception as err: - self._easy_log( - MQTT_LOG_ERR, 'Caught exception in on_socket_register_write: %s', err) - if not self.suppress_exceptions: - raise + on_socket_register_write = self.on_socket_register_write + + if on_socket_register_write: + try: + on_socket_register_write( + self, self._userdata, self._sock) + except Exception as err: + self._easy_log( + MQTT_LOG_ERR, 'Caught exception in on_socket_register_write: %s', err) + if not self.suppress_exceptions: + raise @property def on_socket_unregister_write(self): @@ -2163,10 +2269,19 @@ class Client(object): client: the client instance for this callback userdata: the private user data as set in Client() or userdata_set() sock: the socket which should be unregistered for writing + + Decorator: @client.socket_unregister_write_callback() (```client``` is the name of the + instance which this callback is being attached to) """ with self._callback_mutex: self._on_socket_unregister_write = func + def socket_unregister_write_callback(self): + def decorator(func): + self._on_socket_unregister_write = func + return func + return decorator + def _call_socket_unregister_write(self, sock=None): """Call the socket_unregister_write callback with the writable socket""" sock = sock or self._sock @@ -2175,14 +2290,16 @@ class Client(object): self._registered_write = False with self._callback_mutex: - if self.on_socket_unregister_write: - try: - self.on_socket_unregister_write(self, self._userdata, sock) - except Exception as err: - self._easy_log( - MQTT_LOG_ERR, 'Caught exception in on_socket_unregister_write: %s', err) - if not self.suppress_exceptions: - raise + on_socket_unregister_write = self.on_socket_unregister_write + + if on_socket_unregister_write: + try: + on_socket_unregister_write(self, self._userdata, sock) + except Exception as err: + self._easy_log( + MQTT_LOG_ERR, 'Caught exception in on_socket_unregister_write: %s', err) + if not self.suppress_exceptions: + raise def message_callback_add(self, sub, callback): """Register a message callback for a specific topic. @@ -2201,6 +2318,12 @@ class Client(object): with self._callback_mutex: self._on_message_filtered[sub] = callback + def topic_callback(self, sub): + def decorator(func): + self.message_callback_add(sub, func) + return func + return decorator + def message_callback_remove(self, sub): """Remove a message callback previously registered with message_callback_add().""" @@ -2245,15 +2368,15 @@ class Client(object): if self._in_packet['command'] == 0: try: command = self._sock_recv(1) - except WouldBlockError: + except BlockingIOError: return MQTT_ERR_AGAIN - except socket.error as err: + except ConnectionError as err: self._easy_log( MQTT_LOG_ERR, 'failed to receive on socket: %s', err) - return 1 + return MQTT_ERR_CONN_LOST else: if len(command) == 0: - return 1 + return MQTT_ERR_CONN_LOST command, = struct.unpack("!B", command) self._in_packet['command'] = command @@ -2264,15 +2387,15 @@ class Client(object): while True: try: byte = self._sock_recv(1) - except WouldBlockError: + except BlockingIOError: return MQTT_ERR_AGAIN - except socket.error as err: + except ConnectionError as err: self._easy_log( MQTT_LOG_ERR, 'failed to receive on socket: %s', err) - return 1 + return MQTT_ERR_CONN_LOST else: if len(byte) == 0: - return 1 + return MQTT_ERR_CONN_LOST byte, = struct.unpack("!B", byte) self._in_packet['remaining_count'].append(byte) # Max 4 bytes length for remaining length as defined by protocol. @@ -2290,20 +2413,26 @@ class Client(object): self._in_packet['have_remaining'] = 1 self._in_packet['to_process'] = self._in_packet['remaining_length'] + count = 100 # Don't get stuck in this loop if we have a huge message. while self._in_packet['to_process'] > 0: try: data = self._sock_recv(self._in_packet['to_process']) - except WouldBlockError: + except BlockingIOError: return MQTT_ERR_AGAIN - except socket.error as err: + except ConnectionError as err: self._easy_log( MQTT_LOG_ERR, 'failed to receive on socket: %s', err) - return 1 + return MQTT_ERR_CONN_LOST else: if len(data) == 0: - return 1 + return MQTT_ERR_CONN_LOST self._in_packet['to_process'] -= len(data) self._in_packet['packet'] += data + count -= 1 + if count == 0: + with self._msgtime_mutex: + self._last_msg_in = time_func() + return MQTT_ERR_AGAIN # All data for this packet is read. self._in_packet['pos'] = 0 @@ -2316,7 +2445,7 @@ class Client(object): 'remaining_count': [], 'remaining_mult': 1, 'remaining_length': 0, - 'packet': b"", + 'packet': bytearray(b""), 'to_process': 0, 'pos': 0} @@ -2325,25 +2454,26 @@ class Client(object): return rc def _packet_write(self): - self._current_out_packet_mutex.acquire() - - while self._current_out_packet: - packet = self._current_out_packet + while True: + try: + packet = self._out_packet.popleft() + except IndexError: + return MQTT_ERR_SUCCESS try: write_length = self._sock_send( packet['packet'][packet['pos']:]) except (AttributeError, ValueError): - self._current_out_packet_mutex.release() + self._out_packet.appendleft(packet) return MQTT_ERR_SUCCESS - except WouldBlockError: - self._current_out_packet_mutex.release() + except BlockingIOError: + self._out_packet.appendleft(packet) return MQTT_ERR_AGAIN - except socket.error as err: - self._current_out_packet_mutex.release() + except ConnectionError as err: + self._out_packet.appendleft(packet) self._easy_log( MQTT_LOG_ERR, 'failed to receive on socket: %s', err) - return 1 + return MQTT_ERR_CONN_LOST if write_length > 0: packet['to_process'] -= write_length @@ -2352,40 +2482,35 @@ class Client(object): if packet['to_process'] == 0: if (packet['command'] & 0xF0) == PUBLISH and packet['qos'] == 0: with self._callback_mutex: - if self.on_publish: - with self._in_callback_mutex: - try: - self.on_publish( - self, self._userdata, packet['mid']) - except Exception as err: - self._easy_log( - MQTT_LOG_ERR, 'Caught exception in on_publish: %s', err) - if not self.suppress_exceptions: - raise + on_publish = self.on_publish + + if on_publish: + with self._in_callback_mutex: + try: + on_publish( + self, self._userdata, packet['mid']) + except Exception as err: + self._easy_log( + MQTT_LOG_ERR, 'Caught exception in on_publish: %s', err) + if not self.suppress_exceptions: + raise packet['info']._set_as_published() if (packet['command'] & 0xF0) == DISCONNECT: - self._current_out_packet_mutex.release() - with self._msgtime_mutex: self._last_msg_out = time_func() - self._do_on_disconnect(0) - + self._do_on_disconnect(MQTT_ERR_SUCCESS) self._sock_close() return MQTT_ERR_SUCCESS - with self._out_packet_mutex: - if len(self._out_packet) > 0: - self._current_out_packet = self._out_packet.popleft() - else: - self._current_out_packet = None + else: + # We haven't finished with this packet + self._out_packet.appendleft(packet) else: break - self._current_out_packet_mutex.release() - with self._msgtime_mutex: self._last_msg_out = time_func() @@ -2415,17 +2540,22 @@ class Client(object): if self._sock is not None and (now - last_msg_out >= self._keepalive or now - last_msg_in >= self._keepalive): if self._state == mqtt_cs_connected and self._ping_t == 0: - self._send_pingreq() - with self._msgtime_mutex: - self._last_msg_out = now - self._last_msg_in = now + try: + self._send_pingreq() + except Exception: + self._sock_close() + self._do_on_disconnect(MQTT_ERR_CONN_LOST) + else: + with self._msgtime_mutex: + self._last_msg_out = now + self._last_msg_in = now else: self._sock_close() if self._state == mqtt_cs_disconnecting: rc = MQTT_ERR_SUCCESS else: - rc = 1 + rc = MQTT_ERR_KEEPALIVE self._do_on_disconnect(rc) @@ -2542,7 +2672,7 @@ class Client(object): remaining_length += 2 if self._protocol == MQTTv5: - if properties == None: + if properties is None: packed_properties = b'\x00' else: packed_properties = properties.pack() @@ -2616,13 +2746,13 @@ class Client(object): remaining_length += 2 + len(self._password) if self._protocol == MQTTv5: - if self._connect_properties == None: + if self._connect_properties is None: packed_connect_properties = b'\x00' else: packed_connect_properties = self._connect_properties.pack() remaining_length += len(packed_connect_properties) if self._will: - if self._will_properties == None: + if self._will_properties is None: packed_will_properties = b'\x00' else: packed_will_properties = self._will_properties.pack() @@ -2704,11 +2834,11 @@ class Client(object): packet.append(command) if self._protocol == MQTTv5: - if properties != None or reasoncode != None: - if reasoncode == None: + if properties is not None or reasoncode is not None: + if reasoncode is None: reasoncode = ReasonCodes(DISCONNECT >> 4, identifier=0) remaining_length += 1 - if properties != None: + if properties is not None: packed_props = properties.pack() remaining_length += len(packed_props) @@ -2725,7 +2855,7 @@ class Client(object): def _send_subscribe(self, dup, topics, properties=None): remaining_length = 2 if self._protocol == MQTTv5: - if properties == None: + if properties is None: packed_subscribe_properties = b'\x00' else: packed_subscribe_properties = properties.pack() @@ -2762,7 +2892,7 @@ class Client(object): def _send_unsubscribe(self, dup, topics, properties=None): remaining_length = 2 if self._protocol == MQTTv5: - if properties == None: + if properties is None: packed_unsubscribe_properties = b'\x00' else: packed_unsubscribe_properties = properties.pack() @@ -2803,36 +2933,6 @@ class Client(object): ) return (self._packet_queue(command, packet, local_mid, 1), local_mid) - def _message_retry_check_actual(self, messages, mutex): - with mutex: - now = time_func() - for m in messages.values(): - if m.timestamp + self._message_retry < now: - if m.state == mqtt_ms_wait_for_puback or m.state == mqtt_ms_wait_for_pubrec: - m.timestamp = now - m.dup = True - self._send_publish( - m.mid, - m.topic.encode('utf-8'), - m.payload, - m.qos, - m.retain, - m.dup, - properties=m.properties, - ) - elif m.state == mqtt_ms_wait_for_pubrel: - m.timestamp = now - self._send_pubrec(m.mid) - elif m.state == mqtt_ms_wait_for_pubcomp: - m.timestamp = now - self._send_pubrel(m.mid) - - def _message_retry_check(self): - self._message_retry_check_actual( - self._out_messages, self._out_message_mutex) - self._message_retry_check_actual( - self._in_messages, self._in_message_mutex) - def _check_clean_session(self): if self._protocol == MQTTv5: if self._clean_start == MQTT_CLEAN_START_FIRST_ONLY: @@ -2898,22 +2998,19 @@ class Client(object): 'packet': packet, 'info': info} - with self._out_packet_mutex: - self._out_packet.append(mpkt) - if self._current_out_packet_mutex.acquire(False): - if self._current_out_packet is None and len(self._out_packet) > 0: - self._current_out_packet = self._out_packet.popleft() - self._current_out_packet_mutex.release() + self._out_packet.append(mpkt) # Write a single byte to sockpairW (connected to sockpairR) to break # out of select() if in threaded mode. - try: - self._sockpairW.send(sockpair_data) - except socket.error as err: - if err.errno != EAGAIN: - raise + if self._sockpairW is not None: + try: + self._sockpairW.send(sockpair_data) + except BlockingIOError: + pass - if self._thread is None: + # If we have an external event loop registered, use that instead + # of calling loop_write() directly. + if self._thread is None and self._on_socket_register_write is None: if self._in_callback_mutex.acquire(False): self._in_callback_mutex.release() return self.loop_write() @@ -2977,13 +3074,21 @@ class Client(object): if self._protocol == MQTTv5: (flags, result) = struct.unpack( "!BB", self._in_packet['packet'][:2]) - reason = ReasonCodes(CONNACK >> 4, identifier=result) - properties = Properties(CONNACK >> 4) - properties.unpack(self._in_packet['packet'][2:]) + if result == 1: + # This is probably a failure from a broker that doesn't support + # MQTT v5. + reason = 132 # Unsupported protocol version + properties = None + else: + reason = ReasonCodes(CONNACK >> 4, identifier=result) + properties = Properties(CONNACK >> 4) + properties.unpack(self._in_packet['packet'][2:]) else: (flags, result) = struct.unpack("!BB", self._in_packet['packet']) if self._protocol == MQTTv311: if result == CONNACK_REFUSED_PROTOCOL_VERSION: + if not self._reconnect_on_failure: + return MQTT_ERR_PROTOCOL self._easy_log( MQTT_LOG_DEBUG, "Received CONNACK (%s, %s), attempting downgrade to MQTT v3.1.", @@ -2994,6 +3099,8 @@ class Client(object): return self.reconnect() elif (result == CONNACK_REFUSED_IDENTIFIER_REJECTED and self._client_id == b''): + if not self._reconnect_on_failure: + return MQTT_ERR_PROTOCOL self._easy_log( MQTT_LOG_DEBUG, "Received CONNACK (%s, %s), attempting to use non-empty CID", @@ -3017,22 +3124,24 @@ class Client(object): self._mqttv5_first_connect = False with self._callback_mutex: - if self.on_connect: - flags_dict = {} - flags_dict['session present'] = flags & 0x01 - with self._in_callback_mutex: - try: - if self._protocol == MQTTv5: - self.on_connect(self, self._userdata, - flags_dict, reason, properties) - else: - self.on_connect( - self, self._userdata, flags_dict, result) - except Exception as err: - self._easy_log( - MQTT_LOG_ERR, 'Caught exception in on_connect: %s', err) - if not self.suppress_exceptions: - raise + on_connect = self.on_connect + + if on_connect: + flags_dict = {} + flags_dict['session present'] = flags & 0x01 + with self._in_callback_mutex: + try: + if self._protocol == MQTTv5: + on_connect(self, self._userdata, + flags_dict, reason, properties) + else: + on_connect( + self, self._userdata, flags_dict, result) + except Exception as err: + self._easy_log( + MQTT_LOG_ERR, 'Caught exception in on_connect: %s', err) + if not self.suppress_exceptions: + raise if result == 0: rc = 0 @@ -3140,20 +3249,22 @@ class Client(object): granted_qos = struct.unpack(pack_format, packet) with self._callback_mutex: - if self.on_subscribe: - with self._in_callback_mutex: # Don't call loop_write after _send_publish() - try: - if self._protocol == MQTTv5: - self.on_subscribe( - self, self._userdata, mid, reasoncodes, properties) - else: - self.on_subscribe( - self, self._userdata, mid, granted_qos) - except Exception as err: - self._easy_log( - MQTT_LOG_ERR, 'Caught exception in on_subscribe: %s', err) - if not self.suppress_exceptions: - raise + on_subscribe = self.on_subscribe + + if on_subscribe: + with self._in_callback_mutex: # Don't call loop_write after _send_publish() + try: + if self._protocol == MQTTv5: + on_subscribe( + self, self._userdata, mid, reasoncodes, properties) + else: + on_subscribe( + self, self._userdata, mid, granted_qos) + except Exception as err: + self._easy_log( + MQTT_LOG_ERR, 'Caught exception in on_subscribe: %s', err) + if not self.suppress_exceptions: + raise return MQTT_ERR_SUCCESS @@ -3216,9 +3327,8 @@ class Client(object): self._handle_on_message(message) return MQTT_ERR_SUCCESS elif message.qos == 1: - rc = self._send_puback(message.mid) self._handle_on_message(message) - return rc + return self._send_puback(message.mid) elif message.qos == 2: rc = self._send_pubrec(message.mid) message.state = mqtt_ms_wait_for_pubrel @@ -3333,48 +3443,55 @@ class Client(object): self._easy_log(MQTT_LOG_DEBUG, "Received UNSUBACK (Mid: %d)", mid) with self._callback_mutex: - if self.on_unsubscribe: - with self._in_callback_mutex: - try: - if self._protocol == MQTTv5: - self.on_unsubscribe( - self, self._userdata, mid, properties, reasoncodes) - else: - self.on_unsubscribe(self, self._userdata, mid) - except Exception as err: - self._easy_log( - MQTT_LOG_ERR, 'Caught exception in on_unsubscribe: %s', err) - if not self.suppress_exceptions: - raise + on_unsubscribe = self.on_unsubscribe + + if on_unsubscribe: + with self._in_callback_mutex: + try: + if self._protocol == MQTTv5: + on_unsubscribe( + self, self._userdata, mid, properties, reasoncodes) + else: + on_unsubscribe(self, self._userdata, mid) + except Exception as err: + self._easy_log( + MQTT_LOG_ERR, 'Caught exception in on_unsubscribe: %s', err) + if not self.suppress_exceptions: + raise + return MQTT_ERR_SUCCESS def _do_on_disconnect(self, rc, properties=None): with self._callback_mutex: - if self.on_disconnect: - with self._in_callback_mutex: - try: - if properties: - self.on_disconnect( - self, self._userdata, rc, properties) - else: - self.on_disconnect(self, self._userdata, rc) - except Exception as err: - self._easy_log( - MQTT_LOG_ERR, 'Caught exception in on_disconnect: %s', err) - if not self.suppress_exceptions: - raise + on_disconnect = self.on_disconnect + + if on_disconnect: + with self._in_callback_mutex: + try: + if self._protocol == MQTTv5: + on_disconnect( + self, self._userdata, rc, properties) + else: + on_disconnect(self, self._userdata, rc) + except Exception as err: + self._easy_log( + MQTT_LOG_ERR, 'Caught exception in on_disconnect: %s', err) + if not self.suppress_exceptions: + raise def _do_on_publish(self, mid): with self._callback_mutex: - if self.on_publish: - with self._in_callback_mutex: - try: - self.on_publish(self, self._userdata, mid) - except Exception as err: - self._easy_log( - MQTT_LOG_ERR, 'Caught exception in on_publish: %s', err) - if not self.suppress_exceptions: - raise + on_publish = self.on_publish + + if on_publish: + with self._in_callback_mutex: + try: + on_publish(self, self._userdata, mid) + except Exception as err: + self._easy_log( + MQTT_LOG_ERR, 'Caught exception in on_publish: %s', err) + if not self.suppress_exceptions: + raise msg = self._out_messages.pop(mid) msg.info._set_as_published() @@ -3416,37 +3533,59 @@ class Client(object): def _handle_on_message(self, message): matched = False - with self._callback_mutex: - try: - topic = message.topic - except UnicodeDecodeError: - topic = None + try: + topic = message.topic + except UnicodeDecodeError: + topic = None + + on_message_callbacks = [] + with self._callback_mutex: if topic is not None: for callback in self._on_message_filtered.iter_match(message.topic): - with self._in_callback_mutex: - try: - callback(self, self._userdata, message) - except Exception as err: - self._easy_log( - MQTT_LOG_ERR, - 'Caught exception in user defined callback function %s: %s', - callback.__name__, - err - ) - if not self.suppress_exceptions: - raise - matched = True + on_message_callbacks.append(callback) - if matched == False and self.on_message: - with self._in_callback_mutex: - try: - self.on_message(self, self._userdata, message) - except Exception as err: - self._easy_log( - MQTT_LOG_ERR, 'Caught exception in on_message: %s', err) - if not self.suppress_exceptions: - raise + if len(on_message_callbacks) == 0: + on_message = self.on_message + else: + on_message = None + + for callback in on_message_callbacks: + with self._in_callback_mutex: + try: + callback(self, self._userdata, message) + except Exception as err: + self._easy_log( + MQTT_LOG_ERR, + 'Caught exception in user defined callback function %s: %s', + callback.__name__, + err + ) + if not self.suppress_exceptions: + raise + + if on_message: + with self._in_callback_mutex: + try: + on_message(self, self._userdata, message) + except Exception as err: + self._easy_log( + MQTT_LOG_ERR, 'Caught exception in on_message: %s', err) + if not self.suppress_exceptions: + raise + + + def _handle_on_connect_fail(self): + with self._callback_mutex: + on_connect_fail = self.on_connect_fail + + if on_connect_fail: + with self._in_callback_mutex: + try: + on_connect_fail(self, self._userdata) + except Exception as err: + self._easy_log( + MQTT_LOG_ERR, 'Caught exception in on_connect_fail: %s', err) def _thread_main(self): self.loop_forever(retry_first_connection=True) @@ -3538,18 +3677,12 @@ class Client(object): if sys.version_info < (2, 7) or (3, 0) < sys.version_info < (3, 2): # Have to short-circuit here because of unsupported source_address # param in earlier Python versions. - return socket.create_connection(addr, timeout=self._keepalive) + return socket.create_connection(addr, timeout=self._connect_timeout) if proxy: - return socks.create_connection(addr, source_address=source, timeout=self._keepalive, **proxy) + return socks.create_connection(addr, timeout=self._connect_timeout, source_address=source, **proxy) else: - return socket.create_connection(addr, source_address=source, timeout=self._keepalive) - - -# Compatibility class for easy porting from mosquitto.py. -class Mosquitto(Client): - def __init__(self, client_id="", clean_session=True, userdata=None): - super(Mosquitto, self).__init__(client_id, clean_session, userdata) + return socket.create_connection(addr, timeout=self._connect_timeout, source_address=source) class WebsocketWrapper(object): @@ -3703,19 +3836,19 @@ class WebsocketWrapper(object): def _buffered_read(self, length): - # try to recv and strore needed bytes + # try to recv and store needed bytes wanted_bytes = length - (len(self._readbuffer) - self._readbuffer_head) if wanted_bytes > 0: data = self._socket.recv(wanted_bytes) if not data: - raise socket.error(errno.ECONNABORTED, 0) + raise ConnectionAbortedError else: self._readbuffer.extend(data) if len(data) < wanted_bytes: - raise socket.error(EAGAIN, 0) + raise BlockingIOError self._readbuffer_head += length return self._readbuffer[self._readbuffer_head - length:self._readbuffer_head] @@ -3791,19 +3924,17 @@ class WebsocketWrapper(object): WebsocketWrapper.OPCODE_PONG, payload, 0) self._socket.send(frame) - if opcode == WebsocketWrapper.OPCODE_BINARY and payload_length > 0: + # This isn't *proper* handling of continuation frames, but given + # that we only support binary frames, it is *probably* good enough. + if (opcode == WebsocketWrapper.OPCODE_BINARY or opcode == WebsocketWrapper.OPCODE_CONTINUATION) \ + and payload_length > 0: return result else: - raise socket.error(EAGAIN, 0) + raise BlockingIOError - except socket.error as err: - - if err.errno == errno.ECONNABORTED: - self.connected = False - return b'' - else: - # no more data - raise + except ConnectionError: + self.connected = False + return b'' def _send_impl(self, data): diff --git a/lib/paho/mqtt/matcher.py b/lib/paho/mqtt/matcher.py index 7fc966a3..01ce295c 100644 --- a/lib/paho/mqtt/matcher.py +++ b/lib/paho/mqtt/matcher.py @@ -1,9 +1,9 @@ class MQTTMatcher(object): """Intended to manage topic filters including wildcards. - Internally, MQTTMatcher use a prefix tree (trie) to store - values associated with filters, and has an iter_match() - method to iterate efficiently over all filters that match + Internally, MQTTMatcher use a prefix tree (trie) to store + values associated with filters, and has an iter_match() + method to iterate efficiently over all filters that match some topic name.""" class Node(object): @@ -55,7 +55,7 @@ class MQTTMatcher(object): del parent._children[k] def iter_match(self, topic): - """Return an iterator on all values associated with filters + """Return an iterator on all values associated with filters that match the :topic""" lst = topic.split('/') normal = not topic.startswith('$') diff --git a/lib/paho/mqtt/packettypes.py b/lib/paho/mqtt/packettypes.py index 7eb40697..2fd6a1b5 100644 --- a/lib/paho/mqtt/packettypes.py +++ b/lib/paho/mqtt/packettypes.py @@ -3,7 +3,7 @@ Copyright (c) 2017, 2019 IBM Corp. All rights reserved. This program and the accompanying materials - are made available under the terms of the Eclipse Public License v1.0 + are made available under the terms of the Eclipse Public License v2.0 and Eclipse Distribution License v1.0 which accompany this distribution. The Eclipse Public License is available at diff --git a/lib/paho/mqtt/properties.py b/lib/paho/mqtt/properties.py index 99f654a6..dbcf543e 100644 --- a/lib/paho/mqtt/properties.py +++ b/lib/paho/mqtt/properties.py @@ -3,7 +3,7 @@ Copyright (c) 2017, 2019 IBM Corp. All rights reserved. This program and the accompanying materials - are made available under the terms of the Eclipse Public License v1.0 + are made available under the terms of the Eclipse Public License v2.0 and Eclipse Distribution License v1.0 which accompany this distribution. The Eclipse Public License is available at @@ -16,7 +16,8 @@ ******************************************************************* """ -import sys, struct +import struct +import sys from .packettypes import PacketTypes @@ -268,6 +269,30 @@ class Properties(object): if self.packetType not in self.properties[self.getIdentFromName(name)][1]: raise MQTTException("Property %s does not apply to packet type %s" % (name, PacketTypes.Names[self.packetType])) + + # Check for forbidden values + if type(value) != type([]): + if name in ["ReceiveMaximum", "TopicAlias"] \ + and (value < 1 or value > 65535): + + raise MQTTException( + "%s property value must be in the range 1-65535" % (name)) + elif name in ["TopicAliasMaximum"] \ + and (value < 0 or value > 65535): + + raise MQTTException( + "%s property value must be in the range 0-65535" % (name)) + elif name in ["MaximumPacketSize", "SubscriptionIdentifier"] \ + and (value < 1 or value > 268435455): + + raise MQTTException( + "%s property value must be in the range 1-268435455" % (name)) + elif name in ["RequestResponseInformation", "RequestProblemInformation", "PayloadFormatIndicator"] \ + and (value != 0 and value != 1): + + raise MQTTException( + "%s property value must be 0 or 1" % (name)) + if self.allowsMultiple(name): if type(value) != type([]): value = [value] @@ -294,7 +319,11 @@ class Properties(object): for name in self.names.keys(): compressedName = name.replace(' ', '') if hasattr(self, compressedName): - data[compressedName] = getattr(self, compressedName) + val = getattr(self, compressedName) + if compressedName == 'CorrelationData' and isinstance(val, bytes): + data[compressedName] = val.hex() + else: + data[compressedName] = val return data def isEmpty(self): @@ -391,10 +420,10 @@ class Properties(object): buffer = buffer[VBIlen:] # strip the bytes used by the VBI propslenleft = propslen while propslenleft > 0: # properties length is 0 if there are none - identifier, VBIlen = VariableByteIntegers.decode( + identifier, VBIlen2 = VariableByteIntegers.decode( buffer) # property identifier - buffer = buffer[VBIlen:] # strip the bytes used by the VBI - propslenleft -= VBIlen + buffer = buffer[VBIlen2:] # strip the bytes used by the VBI + propslenleft -= VBIlen2 attr_type = self.properties[identifier][0] value, valuelen = self.readProperty( buffer, attr_type, propslenleft) diff --git a/lib/paho/mqtt/publish.py b/lib/paho/mqtt/publish.py index dcb34ff1..6d1589a7 100644 --- a/lib/paho/mqtt/publish.py +++ b/lib/paho/mqtt/publish.py @@ -1,7 +1,7 @@ # Copyright (c) 2014 Roger Light # # All rights reserved. This program and the accompanying materials -# are made available under the terms of the Eclipse Public License v1.0 +# are made available under the terms of the Eclipse Public License v2.0 # and Eclipse Distribution License v1.0 which accompany this distribution. # # The Eclipse Public License is available at @@ -21,13 +21,15 @@ broker, then disconnect and nothing else is required. from __future__ import absolute_import import collections + try: from collections.abc import Iterable except ImportError: from collections import Iterable -from . import client as paho from .. import mqtt +from . import client as paho + def _do_publish(client): """Internal function""" @@ -52,6 +54,9 @@ def _on_connect(client, userdata, flags, rc): else: raise mqtt.MQTTException(paho.connack_string(rc)) +def _on_connect_v5(client, userdata, flags, rc, properties): + """Internal v5 callback""" + _on_connect(client, userdata, flags, rc) def _on_publish(client, userdata, mid): """Internal callback""" @@ -131,11 +136,15 @@ def multiple(msgs, hostname="localhost", port=1883, client_id="", keepalive=60, if not isinstance(msgs, Iterable): raise TypeError('msgs must be an iterable') + client = paho.Client(client_id=client_id, userdata=collections.deque(msgs), protocol=protocol, transport=transport) client.on_publish = _on_publish - client.on_connect = _on_connect + if protocol == mqtt.client.MQTTv5: + client.on_connect = _on_connect_v5 + else: + client.on_connect = _on_connect if proxy_args is not None: client.proxy_set(**proxy_args) diff --git a/lib/paho/mqtt/reasoncodes.py b/lib/paho/mqtt/reasoncodes.py index 12325bcb..c42e5ba9 100644 --- a/lib/paho/mqtt/reasoncodes.py +++ b/lib/paho/mqtt/reasoncodes.py @@ -3,7 +3,7 @@ Copyright (c) 2017, 2019 IBM Corp. All rights reserved. This program and the accompanying materials - are made available under the terms of the Eclipse Public License v1.0 + are made available under the terms of the Eclipse Public License v2.0 and Eclipse Distribution License v1.0 which accompany this distribution. The Eclipse Public License is available at @@ -17,13 +17,14 @@ """ import sys + from .packettypes import PacketTypes class ReasonCodes: """MQTT version 5.0 reason codes class. - See ReasonCodes.names for a list of possible numeric values along with their + See ReasonCodes.names for a list of possible numeric values along with their names and the packets to which they apply. """ @@ -37,7 +38,7 @@ class ReasonCodes: aName: the String name of the reason code to be created. Ignored if the identifier is set. - identifier: an integer value of the reason code to be created. + identifier: an integer value of the reason code to be created. """ @@ -120,7 +121,7 @@ class ReasonCodes: } if identifier == -1: if packetType == PacketTypes.DISCONNECT and aName == "Success": - aName = "Normal disconnection" + aName = "Normal disconnection" self.set(aName) else: self.value = identifier @@ -153,7 +154,7 @@ class ReasonCodes: if self.packetType in self.names[code][name]: identifier = code break - assert identifier != None, name + assert identifier is not None, name return identifier def set(self, name): @@ -188,4 +189,4 @@ class ReasonCodes: return self.getName() def pack(self): - return bytearray([self.value]) \ No newline at end of file + return bytearray([self.value]) diff --git a/lib/paho/mqtt/subscribe.py b/lib/paho/mqtt/subscribe.py index 8900a6bb..643df9c1 100644 --- a/lib/paho/mqtt/subscribe.py +++ b/lib/paho/mqtt/subscribe.py @@ -1,7 +1,7 @@ # Copyright (c) 2016 Roger Light # # All rights reserved. This program and the accompanying materials -# are made available under the terms of the Eclipse Public License v1.0 +# are made available under the terms of the Eclipse Public License v2.0 # and Eclipse Distribution License v1.0 which accompany this distribution. # # The Eclipse Public License is available at @@ -20,10 +20,11 @@ you to pass a callback for processing of messages. """ from __future__ import absolute_import -from . import client as paho from .. import mqtt +from . import client as paho -def _on_connect(client, userdata, flags, rc): + +def _on_connect_v5(client, userdata, flags, rc, properties): """Internal callback""" if rc != 0: raise mqtt.MQTTException(paho.connack_string(rc)) @@ -34,6 +35,10 @@ def _on_connect(client, userdata, flags, rc): else: client.subscribe(userdata['topics'], userdata['qos']) +def _on_connect(client, userdata, flags, rc): + """Internal v5 callback""" + _on_connect_v5(client, userdata, flags, rc, None) + def _on_message_callback(client, userdata, message): """Internal callback""" @@ -142,7 +147,10 @@ def callback(callback, topics, qos=0, userdata=None, hostname="localhost", protocol=protocol, transport=transport, clean_session=clean_session) client.on_message = _on_message_callback - client.on_connect = _on_connect + if protocol == mqtt.client.MQTTv5: + client.on_connect = _on_connect_v5 + else: + client.on_connect = _on_connect if proxy_args is not None: client.proxy_set(**proxy_args) diff --git a/lib/paho/mqtt/subscribeoptions.py b/lib/paho/mqtt/subscribeoptions.py index f55e90a4..5b4f0733 100644 --- a/lib/paho/mqtt/subscribeoptions.py +++ b/lib/paho/mqtt/subscribeoptions.py @@ -3,7 +3,7 @@ Copyright (c) 2017, 2019 IBM Corp. All rights reserved. This program and the accompanying materials - are made available under the terms of the Eclipse Public License v1.0 + are made available under the terms of the Eclipse Public License v2.0 and Eclipse Distribution License v1.0 which accompany this distribution. The Eclipse Public License is available at