diff --git a/lib/paho/__init__.py b/lib/paho/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/lib/paho/mqtt/__init__.py b/lib/paho/mqtt/__init__.py new file mode 100644 index 00000000..f9014086 --- /dev/null +++ b/lib/paho/mqtt/__init__.py @@ -0,0 +1 @@ +__version__ = "1.1" diff --git a/lib/paho/mqtt/client.py b/lib/paho/mqtt/client.py new file mode 100644 index 00000000..9bbde8a5 --- /dev/null +++ b/lib/paho/mqtt/client.py @@ -0,0 +1,2337 @@ +# Copyright (c) 2012-2014 Roger Light +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Eclipse Public License v1.0 +# and Eclipse Distribution License v1.0 which accompany this distribution. +# +# The Eclipse Public License is available at +# http://www.eclipse.org/legal/epl-v10.html +# and the Eclipse Distribution License is available at +# http://www.eclipse.org/org/documents/edl-v10.php. +# +# Contributors: +# Roger Light - initial API and implementation + +""" +This is an MQTT v3.1 client module. MQTT is a lightweight pub/sub messaging +protocol that is easy to implement and suitable for low powered devices. +""" +import errno +import platform +import random +import select +import socket +HAVE_SSL = True +try: + import ssl + cert_reqs = ssl.CERT_REQUIRED + tls_version = ssl.PROTOCOL_TLSv1 +except: + HAVE_SSL = False + cert_reqs = None + tls_version = None +import struct +import sys +import threading +import time +HAVE_DNS = True +try: + import dns.resolver +except ImportError: + HAVE_DNS = False + +if platform.system() == 'Windows': + EAGAIN = errno.WSAEWOULDBLOCK +else: + EAGAIN = errno.EAGAIN + +VERSION_MAJOR=1 +VERSION_MINOR=0 +VERSION_REVISION=0 +VERSION_NUMBER=(VERSION_MAJOR*1000000+VERSION_MINOR*1000+VERSION_REVISION) + +MQTTv31 = 3 +MQTTv311 = 4 + +if sys.version_info[0] < 3: + PROTOCOL_NAMEv31 = "MQIsdp" + PROTOCOL_NAMEv311 = "MQTT" +else: + PROTOCOL_NAMEv31 = b"MQIsdp" + PROTOCOL_NAMEv311 = b"MQTT" + +PROTOCOL_VERSION = 3 + +# Message types +CONNECT = 0x10 +CONNACK = 0x20 +PUBLISH = 0x30 +PUBACK = 0x40 +PUBREC = 0x50 +PUBREL = 0x60 +PUBCOMP = 0x70 +SUBSCRIBE = 0x80 +SUBACK = 0x90 +UNSUBSCRIBE = 0xA0 +UNSUBACK = 0xB0 +PINGREQ = 0xC0 +PINGRESP = 0xD0 +DISCONNECT = 0xE0 + +# Log levels +MQTT_LOG_INFO = 0x01 +MQTT_LOG_NOTICE = 0x02 +MQTT_LOG_WARNING = 0x04 +MQTT_LOG_ERR = 0x08 +MQTT_LOG_DEBUG = 0x10 + +# CONNACK codes +CONNACK_ACCEPTED = 0 +CONNACK_REFUSED_PROTOCOL_VERSION = 1 +CONNACK_REFUSED_IDENTIFIER_REJECTED = 2 +CONNACK_REFUSED_SERVER_UNAVAILABLE = 3 +CONNACK_REFUSED_BAD_USERNAME_PASSWORD = 4 +CONNACK_REFUSED_NOT_AUTHORIZED = 5 + +# Connection state +mqtt_cs_new = 0 +mqtt_cs_connected = 1 +mqtt_cs_disconnecting = 2 +mqtt_cs_connect_async = 3 + +# Message state +mqtt_ms_invalid = 0 +mqtt_ms_publish= 1 +mqtt_ms_wait_for_puback = 2 +mqtt_ms_wait_for_pubrec = 3 +mqtt_ms_resend_pubrel = 4 +mqtt_ms_wait_for_pubrel = 5 +mqtt_ms_resend_pubcomp = 6 +mqtt_ms_wait_for_pubcomp = 7 +mqtt_ms_send_pubrec = 8 +mqtt_ms_queued = 9 + +# Error values +MQTT_ERR_AGAIN = -1 +MQTT_ERR_SUCCESS = 0 +MQTT_ERR_NOMEM = 1 +MQTT_ERR_PROTOCOL = 2 +MQTT_ERR_INVAL = 3 +MQTT_ERR_NO_CONN = 4 +MQTT_ERR_CONN_REFUSED = 5 +MQTT_ERR_NOT_FOUND = 6 +MQTT_ERR_CONN_LOST = 7 +MQTT_ERR_TLS = 8 +MQTT_ERR_PAYLOAD_SIZE = 9 +MQTT_ERR_NOT_SUPPORTED = 10 +MQTT_ERR_AUTH = 11 +MQTT_ERR_ACL_DENIED = 12 +MQTT_ERR_UNKNOWN = 13 +MQTT_ERR_ERRNO = 14 + +if sys.version_info[0] < 3: + sockpair_data = "0" +else: + sockpair_data = b"0" + +def error_string(mqtt_errno): + """Return the error string associated with an mqtt error number.""" + if mqtt_errno == MQTT_ERR_SUCCESS: + return "No error." + elif mqtt_errno == MQTT_ERR_NOMEM: + return "Out of memory." + elif mqtt_errno == MQTT_ERR_PROTOCOL: + return "A network protocol error occurred when communicating with the broker." + elif mqtt_errno == MQTT_ERR_INVAL: + return "Invalid function arguments provided." + elif mqtt_errno == MQTT_ERR_NO_CONN: + return "The client is not currently connected." + elif mqtt_errno == MQTT_ERR_CONN_REFUSED: + return "The connection was refused." + elif mqtt_errno == MQTT_ERR_NOT_FOUND: + return "Message not found (internal error)." + elif mqtt_errno == MQTT_ERR_CONN_LOST: + return "The connection was lost." + elif mqtt_errno == MQTT_ERR_TLS: + return "A TLS error occurred." + elif mqtt_errno == MQTT_ERR_PAYLOAD_SIZE: + return "Payload too large." + elif mqtt_errno == MQTT_ERR_NOT_SUPPORTED: + return "This feature is not supported." + elif mqtt_errno == MQTT_ERR_AUTH: + return "Authorisation failed." + elif mqtt_errno == MQTT_ERR_ACL_DENIED: + return "Access denied by ACL." + elif mqtt_errno == MQTT_ERR_UNKNOWN: + return "Unknown error." + elif mqtt_errno == MQTT_ERR_ERRNO: + return "Error defined by errno." + else: + return "Unknown error." + + +def connack_string(connack_code): + """Return the string associated with a CONNACK result.""" + if connack_code == 0: + return "Connection Accepted." + elif connack_code == 1: + return "Connection Refused: unacceptable protocol version." + elif connack_code == 2: + return "Connection Refused: identifier rejected." + elif connack_code == 3: + return "Connection Refused: broker unavailable." + elif connack_code == 4: + return "Connection Refused: bad user name or password." + elif connack_code == 5: + return "Connection Refused: not authorised." + else: + return "Connection Refused: unknown reason." + + +def topic_matches_sub(sub, topic): + """Check whether a topic matches a subscription. + + For example: + + foo/bar would match the subscription foo/# or +/bar + non/matching would not match the subscription non/+/+ + """ + result = True + multilevel_wildcard = False + + slen = len(sub) + tlen = len(topic) + + if slen > 0 and tlen > 0: + if (sub[0] == '$' and topic[0] != '$') or (topic[0] == '$' and sub[0] != '$'): + return False + + spos = 0 + tpos = 0 + + while spos < slen and tpos < tlen: + if sub[spos] == topic[tpos]: + if tpos == tlen-1: + # Check for e.g. foo matching foo/# + if spos == slen-3 and sub[spos+1] == '/' and sub[spos+2] == '#': + result = True + multilevel_wildcard = True + break + + spos += 1 + tpos += 1 + + if tpos == tlen and spos == slen-1 and sub[spos] == '+': + spos += 1 + result = True + break + else: + if sub[spos] == '+': + spos += 1 + while tpos < tlen and topic[tpos] != '/': + tpos += 1 + if tpos == tlen and spos == slen: + result = True + break + + elif sub[spos] == '#': + multilevel_wildcard = True + if spos+1 != slen: + result = False + break + else: + result = True + break + + else: + result = False + break + + if not multilevel_wildcard and (tpos < tlen or spos < slen): + result = False + + return result + + +def _socketpair_compat(): + """TCP/IP socketpair including Windows support""" + listensock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_IP) + listensock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + listensock.bind(("127.0.0.1", 0)) + listensock.listen(1) + + iface, port = listensock.getsockname() + sock1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_IP) + sock1.setblocking(0) + try: + sock1.connect(("localhost", port)) + except socket.error as err: + if err.errno != errno.EINPROGRESS and err.errno != errno.EWOULDBLOCK and err.errno != EAGAIN: + raise + sock2, address = listensock.accept() + sock2.setblocking(0) + listensock.close() + return (sock1, sock2) + + +class MQTTMessage: + """ This is a class that describes an incoming message. It is passed to the + on_message callback as the message parameter. + + Members: + + topic : String. topic that the message was published on. + payload : String/bytes the message payload. + qos : Integer. The message Quality of Service 0, 1 or 2. + retain : Boolean. If true, the message is a retained message and not fresh. + mid : Integer. The message id. + """ + def __init__(self): + self.timestamp = 0 + self.state = mqtt_ms_invalid + self.dup = False + self.mid = 0 + self.topic = "" + self.payload = None + self.qos = 0 + self.retain = False + + +class Client(object): + """MQTT version 3.1/3.1.1 client class. + + This is the main class for use communicating with an MQTT broker. + + General usage flow: + + * Use connect()/connect_async() to connect to a broker + * Call loop() frequently to maintain network traffic flow with the broker + * Or use loop_start() to set a thread running to call loop() for you. + * Or use loop_forever() to handle calling loop() for you in a blocking + * function. + * Use subscribe() to subscribe to a topic and receive messages + * Use publish() to send messages + * Use disconnect() to disconnect from the broker + + Data returned from the broker is made available with the use of callback + functions as described below. + + Callbacks + ========= + + A number of callback functions are available to receive data back from the + broker. To use a callback, define a function and then assign it to the + client: + + def on_connect(client, userdata, flags, rc): + print("Connection returned " + str(rc)) + + client.on_connect = on_connect + + All of the callbacks as described below have a "client" and an "userdata" + argument. "client" is the Client instance that is calling the callback. + "userdata" is user data of any type and can be set when creating a new client + instance or with user_data_set(userdata). + + The callbacks: + + on_connect(client, userdata, flags, rc): called when the broker responds to our connection + request. + flags is a dict that contains response flags from the broker: + flags['session present'] - this flag is useful for clients that are + using clean session set to 0 only. If a client with clean + session=0, that reconnects to a broker that it has previously + connected to, this flag indicates whether the broker still has the + session information for the client. If 1, the session still exists. + The value of rc determines success or not: + 0: Connection successful + 1: Connection refused - incorrect protocol version + 2: Connection refused - invalid client identifier + 3: Connection refused - server unavailable + 4: Connection refused - bad username or password + 5: Connection refused - not authorised + 6-255: Currently unused. + + on_disconnect(client, userdata, rc): called when the client disconnects from the broker. + The rc parameter indicates the disconnection state. If MQTT_ERR_SUCCESS + (0), the callback was called in response to a disconnect() call. If any + other value the disconnection was unexpected, such as might be caused by + a network error. + + on_message(client, userdata, message): called when a message has been received on a + topic that the client subscribes to. The message variable is a + MQTTMessage that describes all of the message parameters. + + on_publish(client, userdata, mid): called when a message that was to be sent using the + publish() call has completed transmission to the broker. For messages + with QoS levels 1 and 2, this means that the appropriate handshakes have + completed. For QoS 0, this simply means that the message has left the + client. The mid variable matches the mid variable returned from the + corresponding publish() call, to allow outgoing messages to be tracked. + This callback is important because even if the publish() call returns + success, it does not always mean that the message has been sent. + + on_subscribe(client, userdata, mid, granted_qos): called when the broker responds to a + subscribe request. The mid variable matches the mid variable returned + from the corresponding subscribe() call. The granted_qos variable is a + list of integers that give the QoS level the broker has granted for each + of the different subscription requests. + + on_unsubscribe(client, userdata, mid): called when the broker responds to an unsubscribe + request. The mid variable matches the mid variable returned from the + corresponding unsubscribe() call. + + on_log(client, userdata, level, buf): called when the client has log information. Define + to allow debugging. The level variable gives the severity of the message + and will be one of MQTT_LOG_INFO, MQTT_LOG_NOTICE, MQTT_LOG_WARNING, + MQTT_LOG_ERR, and MQTT_LOG_DEBUG. The message itself is in buf. + + """ + def __init__(self, client_id="", clean_session=True, userdata=None, protocol=MQTTv31): + """client_id is the unique client id string used when connecting to the + broker. If client_id is zero length or None, then one will be randomly + generated. In this case, clean_session must be True. If this is not the + case a ValueError will be raised. + + clean_session is a boolean that determines the client type. If True, + the broker will remove all information about this client when it + disconnects. If False, the client is a persistent client and + subscription information and queued messages will be retained when the + client disconnects. + Note that a client will never discard its own outgoing messages on + disconnect. Calling connect() or reconnect() will cause the messages to + be resent. Use reinitialise() to reset a client to its original state. + + userdata is user defined data of any type that is passed as the "userdata" + parameter to callbacks. It may be updated at a later point with the + user_data_set() function. + + The protocol argument allows explicit setting of the MQTT version to + use for this client. Can be paho.mqtt.client.MQTTv311 (v3.1.1) or + paho.mqtt.client.MQTTv31 (v3.1), with the default being v3.1. If the + broker reports that the client connected with an invalid protocol + version, the client will automatically attempt to reconnect using v3.1 + instead. + """ + if not clean_session and (client_id == "" or client_id is None): + raise ValueError('A client id must be provided if clean session is False.') + + self._protocol = protocol + self._userdata = userdata + self._sock = None + self._sockpairR, self._sockpairW = _socketpair_compat() + self._keepalive = 60 + self._message_retry = 20 + self._last_retry_check = 0 + self._clean_session = clean_session + if client_id == "" or client_id is None: + self._client_id = "paho/" + "".join(random.choice("0123456789ADCDEF") for x in range(23-5)) + else: + self._client_id = client_id + + self._username = "" + self._password = "" + self._in_packet = { + "command": 0, + "have_remaining": 0, + "remaining_count": [], + "remaining_mult": 1, + "remaining_length": 0, + "packet": b"", + "to_process": 0, + "pos": 0} + self._out_packet = [] + self._current_out_packet = None + self._last_msg_in = time.time() + self._last_msg_out = time.time() + self._ping_t = 0 + self._last_mid = 0 + self._state = mqtt_cs_new + self._out_messages = [] + self._in_messages = [] + self._max_inflight_messages = 20 + self._inflight_messages = 0 + self._will = False + self._will_topic = "" + self._will_payload = None + self._will_qos = 0 + self._will_retain = False + self.on_disconnect = None + self.on_connect = None + self.on_publish = None + self.on_message = None + self.on_message_filtered = [] + self.on_subscribe = None + self.on_unsubscribe = None + self.on_log = None + self._host = "" + self._port = 1883 + self._bind_address = "" + self._in_callback = False + self._strict_protocol = False + self._callback_mutex = threading.Lock() + self._state_mutex = threading.Lock() + self._out_packet_mutex = threading.Lock() + self._current_out_packet_mutex = threading.Lock() + self._msgtime_mutex = threading.Lock() + self._out_message_mutex = threading.Lock() + self._in_message_mutex = threading.Lock() + self._thread = None + self._thread_terminate = False + self._ssl = None + self._tls_certfile = None + self._tls_keyfile = None + self._tls_ca_certs = None + self._tls_cert_reqs = None + self._tls_ciphers = None + self._tls_version = tls_version + self._tls_insecure = False + + def __del__(self): + pass + + def reinitialise(self, client_id="", clean_session=True, userdata=None): + if self._ssl: + self._ssl.close() + self._ssl = None + self._sock = None + elif self._sock: + self._sock.close() + self._sock = None + if self._sockpairR: + self._sockpairR.close() + self._sockpairR = None + if self._sockpairW: + self._sockpairW.close() + self._sockpairW = None + + self.__init__(client_id, clean_session, userdata) + + def tls_set(self, ca_certs, certfile=None, keyfile=None, cert_reqs=cert_reqs, tls_version=tls_version, ciphers=None): + """Configure network encryption and authentication options. Enables SSL/TLS support. + + ca_certs : a string path to the Certificate Authority certificate files + that are to be treated as trusted by this client. If this is the only + option given then the client will operate in a similar manner to a web + browser. That is to say it will require the broker to have a + certificate signed by the Certificate Authorities in ca_certs and will + communicate using TLS v1, but will not attempt any form of + authentication. This provides basic network encryption but may not be + sufficient depending on how the broker is configured. + + certfile and keyfile are strings pointing to the PEM encoded client + certificate and private keys respectively. If these arguments are not + None then they will be used as client information for TLS based + authentication. Support for this feature is broker dependent. Note + that if either of these files in encrypted and needs a password to + decrypt it, Python will ask for the password at the command line. It is + not currently possible to define a callback to provide the password. + + cert_reqs allows the certificate requirements that the client imposes + on the broker to be changed. By default this is ssl.CERT_REQUIRED, + which means that the broker must provide a certificate. See the ssl + pydoc for more information on this parameter. + + tls_version allows the version of the SSL/TLS protocol used to be + specified. By default TLS v1 is used. Previous versions (all versions + beginning with SSL) are possible but not recommended due to possible + security problems. + + ciphers is a string specifying which encryption ciphers are allowable + for this connection, or None to use the defaults. See the ssl pydoc for + more information. + + Must be called before connect() or connect_async().""" + if HAVE_SSL is False: + raise ValueError('This platform has no SSL/TLS.') + + if sys.version < '2.7': + raise ValueError('Python 2.7 is the minimum supported version for TLS.') + + if ca_certs is None: + raise ValueError('ca_certs must not be None.') + + try: + f = open(ca_certs, "r") + except IOError as err: + raise IOError(ca_certs+": "+err.strerror) + else: + f.close() + if certfile is not None: + try: + f = open(certfile, "r") + except IOError as err: + raise IOError(certfile+": "+err.strerror) + else: + f.close() + if keyfile is not None: + try: + f = open(keyfile, "r") + except IOError as err: + raise IOError(keyfile+": "+err.strerror) + else: + f.close() + + self._tls_ca_certs = ca_certs + self._tls_certfile = certfile + self._tls_keyfile = keyfile + self._tls_cert_reqs = cert_reqs + self._tls_version = tls_version + self._tls_ciphers = ciphers + + def tls_insecure_set(self, value): + """Configure verification of the server hostname in the server certificate. + + If value is set to true, it is impossible to guarantee that the host + you are connecting to is not impersonating your server. This can be + useful in initial server testing, but makes it possible for a malicious + third party to impersonate your server through DNS spoofing, for + example. + + Do not use this function in a real system. Setting value to true means + there is no point using encryption. + + Must be called before connect().""" + if HAVE_SSL is False: + raise ValueError('This platform has no SSL/TLS.') + + self._tls_insecure = value + + def connect(self, host, port=1883, keepalive=60, bind_address=""): + """Connect to a remote broker. + + host is the hostname or IP address of the remote broker. + port is the network port of the server host to connect to. Defaults to + 1883. Note that the default port for MQTT over SSL/TLS is 8883 so if you + are using tls_set() the port may need providing. + keepalive: Maximum period in seconds between communications with the + broker. If no other messages are being exchanged, this controls the + rate at which the client will send ping messages to the broker. + """ + self.connect_async(host, port, keepalive, bind_address) + return self.reconnect() + + def connect_srv(self, domain=None, keepalive=60, bind_address=""): + """Connect to a remote broker. + + domain is the DNS domain to search for SRV records; if None, + try to determine local domain name. + keepalive and bind_address are as for connect() + """ + + if HAVE_DNS is False: + raise ValueError('No DNS resolver library found.') + + if domain is None: + domain = socket.getfqdn() + domain = domain[domain.find('.') + 1:] + + try: + rr = '_mqtt._tcp.%s' % domain + if self._ssl is not None: + # IANA specifies secure-mqtt (not mqtts) for port 8883 + rr = '_secure-mqtt._tcp.%s' % domain + answers = [] + for answer in dns.resolver.query(rr, dns.rdatatype.SRV): + addr = answer.target.to_text()[:-1] + answers.append((addr, answer.port, answer.priority, answer.weight)) + except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, dns.resolver.NoNameservers): + raise ValueError("No answer/NXDOMAIN for SRV in %s" % (domain)) + + # FIXME: doesn't account for weight + for answer in answers: + host, port, prio, weight = answer + + try: + return self.connect(host, port, keepalive, bind_address) + except: + pass + + raise ValueError("No SRV hosts responded") + + def connect_async(self, host, port=1883, keepalive=60, bind_address=""): + """Connect to a remote broker asynchronously. This is a non-blocking + connect call that can be used with loop_start() to provide very quick + start. + + host is the hostname or IP address of the remote broker. + port is the network port of the server host to connect to. Defaults to + 1883. Note that the default port for MQTT over SSL/TLS is 8883 so if you + are using tls_set() the port may need providing. + keepalive: Maximum period in seconds between communications with the + broker. If no other messages are being exchanged, this controls the + rate at which the client will send ping messages to the broker. + """ + if host is None or len(host) == 0: + raise ValueError('Invalid host.') + if port <= 0: + raise ValueError('Invalid port number.') + if keepalive < 0: + raise ValueError('Keepalive must be >=0.') + if bind_address != "" and bind_address is not None: + if (sys.version_info[0] == 2 and sys.version_info[1] < 7) or (sys.version_info[0] == 3 and sys.version_info[1] < 2): + raise ValueError('bind_address requires Python 2.7 or 3.2.') + + self._host = host + self._port = port + self._keepalive = keepalive + self._bind_address = bind_address + + self._state_mutex.acquire() + self._state = mqtt_cs_connect_async + self._state_mutex.release() + + def reconnect(self): + """Reconnect the client after a disconnect. Can only be called after + connect()/connect_async().""" + if len(self._host) == 0: + raise ValueError('Invalid host.') + if self._port <= 0: + raise ValueError('Invalid port number.') + + self._in_packet = { + "command": 0, + "have_remaining": 0, + "remaining_count": [], + "remaining_mult": 1, + "remaining_length": 0, + "packet": b"", + "to_process": 0, + "pos": 0} + + self._out_packet_mutex.acquire() + self._out_packet = [] + self._out_packet_mutex.release() + + self._current_out_packet_mutex.acquire() + self._current_out_packet = None + self._current_out_packet_mutex.release() + + self._msgtime_mutex.acquire() + self._last_msg_in = time.time() + self._last_msg_out = time.time() + self._msgtime_mutex.release() + + self._ping_t = 0 + self._state_mutex.acquire() + self._state = mqtt_cs_new + self._state_mutex.release() + if self._ssl: + self._ssl.close() + self._ssl = None + self._sock = None + elif self._sock: + self._sock.close() + self._sock = None + + # Put messages in progress in a valid state. + self._messages_reconnect_reset() + + try: + if (sys.version_info[0] == 2 and sys.version_info[1] < 7) or (sys.version_info[0] == 3 and sys.version_info[1] < 2): + sock = socket.create_connection((self._host, self._port)) + else: + sock = socket.create_connection((self._host, self._port), source_address=(self._bind_address, 0)) + except socket.error as err: + if err.errno != errno.EINPROGRESS and err.errno != errno.EWOULDBLOCK and err.errno != EAGAIN: + raise + + if self._tls_ca_certs is not None: + self._ssl = ssl.wrap_socket( + sock, + certfile=self._tls_certfile, + keyfile=self._tls_keyfile, + ca_certs=self._tls_ca_certs, + cert_reqs=self._tls_cert_reqs, + ssl_version=self._tls_version, + ciphers=self._tls_ciphers) + + if self._tls_insecure is False: + if sys.version_info[0] < 3 or (sys.version_info[0] == 3 and sys.version_info[1] < 2): + self._tls_match_hostname() + else: + ssl.match_hostname(self._ssl.getpeercert(), self._host) + + self._sock = sock + self._sock.setblocking(0) + + return self._send_connect(self._keepalive, self._clean_session) + + def loop(self, timeout=1.0, max_packets=1): + """Process network events. + + This function must be called regularly to ensure communication with the + broker is carried out. It calls select() on the network socket to wait + for network events. If incoming data is present it will then be + processed. Outgoing commands, from e.g. publish(), are normally sent + immediately that their function is called, but this is not always + possible. loop() will also attempt to send any remaining outgoing + messages, which also includes commands that are part of the flow for + messages with QoS>0. + + timeout: The time in seconds to wait for incoming/outgoing network + traffic before timing out and returning. + max_packets: Not currently used. + + Returns MQTT_ERR_SUCCESS on success. + Returns >0 on error. + + A ValueError will be raised if timeout < 0""" + if timeout < 0.0: + raise ValueError('Invalid timeout.') + + self._current_out_packet_mutex.acquire() + self._out_packet_mutex.acquire() + if self._current_out_packet is None and len(self._out_packet) > 0: + self._current_out_packet = self._out_packet.pop(0) + + if self._current_out_packet: + wlist = [self.socket()] + else: + wlist = [] + self._out_packet_mutex.release() + self._current_out_packet_mutex.release() + + # sockpairR is used to break out of select() before the timeout, on a + # call to publish() etc. + rlist = [self.socket(), self._sockpairR] + try: + socklist = select.select(rlist, wlist, [], timeout) + except TypeError: + # Socket isn't correct type, in likelihood connection is lost + return MQTT_ERR_CONN_LOST + except ValueError: + # Can occur if we just reconnected but rlist/wlist contain a -1 for + # some reason. + return MQTT_ERR_CONN_LOST + except: + return MQTT_ERR_UNKNOWN + + if self.socket() in socklist[0]: + rc = self.loop_read(max_packets) + if rc or (self._ssl is None and self._sock is None): + return rc + + if self._sockpairR in socklist[0]: + # Stimulate output write even though we didn't ask for it, because + # at that point the publish or other command wasn't present. + socklist[1].insert(0, self.socket()) + # Clear sockpairR - only ever a single byte written. + try: + self._sockpairR.recv(1) + except socket.error as err: + if err.errno != EAGAIN: + raise + + if self.socket() in socklist[1]: + rc = self.loop_write(max_packets) + if rc or (self._ssl is None and self._sock is None): + return rc + + return self.loop_misc() + + def publish(self, topic, payload=None, qos=0, retain=False): + """Publish a message on a topic. + + This causes a message to be sent to the broker and subsequently from + the broker to any clients subscribing to matching topics. + + topic: The topic that the message should be published on. + payload: The actual message to send. If not given, or set to None a + zero length message will be used. Passing an int or float will result + in the payload being converted to a string representing that number. If + you wish to send a true int/float, use struct.pack() to create the + payload you require. + qos: The quality of service level to use. + retain: If set to true, the message will be set as the "last known + good"/retained message for the topic. + + Returns a tuple (result, mid), where result is MQTT_ERR_SUCCESS to + indicate success or MQTT_ERR_NO_CONN if the client is not currently + connected. mid is the message ID for the publish request. The mid + value can be used to track the publish request by checking against the + mid argument in the on_publish() callback if it is defined. + + A ValueError will be raised if topic is None, has zero length or is + invalid (contains a wildcard), if qos is not one of 0, 1 or 2, or if + the length of the payload is greater than 268435455 bytes.""" + if topic is None or len(topic) == 0: + raise ValueError('Invalid topic.') + if qos<0 or qos>2: + raise ValueError('Invalid QoS level.') + if isinstance(payload, str) or isinstance(payload, bytearray): + local_payload = payload + elif sys.version_info[0] < 3 and isinstance(payload, unicode): + local_payload = payload + elif isinstance(payload, int) or isinstance(payload, float): + local_payload = str(payload) + elif payload is None: + local_payload = None + else: + raise TypeError('payload must be a string, bytearray, int, float or None.') + + if local_payload is not None and len(local_payload) > 268435455: + raise ValueError('Payload too large.') + + if self._topic_wildcard_len_check(topic) != MQTT_ERR_SUCCESS: + raise ValueError('Publish topic cannot contain wildcards.') + + local_mid = self._mid_generate() + + if qos == 0: + rc = self._send_publish(local_mid, topic, local_payload, qos, retain, False) + return (rc, local_mid) + else: + message = MQTTMessage() + message.timestamp = time.time() + + message.mid = local_mid + message.topic = topic + if local_payload is None or len(local_payload) == 0: + message.payload = None + else: + message.payload = local_payload + + message.qos = qos + message.retain = retain + message.dup = False + + self._out_message_mutex.acquire() + self._out_messages.append(message) + if self._max_inflight_messages == 0 or self._inflight_messages < self._max_inflight_messages: + self._inflight_messages = self._inflight_messages+1 + if qos == 1: + message.state = mqtt_ms_wait_for_puback + elif qos == 2: + message.state = mqtt_ms_wait_for_pubrec + self._out_message_mutex.release() + + rc = self._send_publish(message.mid, message.topic, message.payload, message.qos, message.retain, message.dup) + + # remove from inflight messages so it will be send after a connection is made + if rc is MQTT_ERR_NO_CONN: + with self._out_message_mutex: + self._inflight_messages -= 1 + message.state = mqtt_ms_publish + + return (rc, local_mid) + else: + message.state = mqtt_ms_queued; + self._out_message_mutex.release() + return (MQTT_ERR_SUCCESS, local_mid) + + def username_pw_set(self, username, password=None): + """Set a username and optionally a password for broker authentication. + + Must be called before connect() to have any effect. + Requires a broker that supports MQTT v3.1. + + username: The username to authenticate with. Need have no relationship to the client id. + password: The password to authenticate with. Optional, set to None if not required. + """ + self._username = username.encode('utf-8') + self._password = password + + def disconnect(self): + """Disconnect a connected client from the broker.""" + self._state_mutex.acquire() + self._state = mqtt_cs_disconnecting + self._state_mutex.release() + + if self._sock is None and self._ssl is None: + return MQTT_ERR_NO_CONN + + return self._send_disconnect() + + def subscribe(self, topic, qos=0): + """Subscribe the client to one or more topics. + + This function may be called in three different ways: + + Simple string and integer + ------------------------- + e.g. subscribe("my/topic", 2) + + topic: A string specifying the subscription topic to subscribe to. + qos: The desired quality of service level for the subscription. + Defaults to 0. + + String and integer tuple + ------------------------ + e.g. subscribe(("my/topic", 1)) + + topic: A tuple of (topic, qos). Both topic and qos must be present in + the tuple. + qos: Not used. + + List of string and integer tuples + ------------------------ + e.g. subscribe([("my/topic", 0), ("another/topic", 2)]) + + This allows multiple topic subscriptions in a single SUBSCRIPTION + command, which is more efficient than using multiple calls to + subscribe(). + + topic: A list of tuple of format (topic, qos). Both topic and qos must + be present in all of the tuples. + qos: Not used. + + The function returns a tuple (result, mid), where result is + MQTT_ERR_SUCCESS to indicate success or (MQTT_ERR_NO_CONN, None) if the + client is not currently connected. mid is the message ID for the + subscribe request. The mid value can be used to track the subscribe + request by checking against the mid argument in the on_subscribe() + callback if it is defined. + + Raises a ValueError if qos is not 0, 1 or 2, or if topic is None or has + zero string length, or if topic is not a string, tuple or list. + """ + topic_qos_list = None + if isinstance(topic, str): + if qos<0 or qos>2: + raise ValueError('Invalid QoS level.') + if topic is None or len(topic) == 0: + raise ValueError('Invalid topic.') + topic_qos_list = [(topic.encode('utf-8'), qos)] + elif isinstance(topic, tuple): + if topic[1]<0 or topic[1]>2: + raise ValueError('Invalid QoS level.') + if topic[0] is None or len(topic[0]) == 0 or not isinstance(topic[0], str): + raise ValueError('Invalid topic.') + topic_qos_list = [(topic[0].encode('utf-8'), topic[1])] + elif isinstance(topic, list): + topic_qos_list = [] + for t in topic: + if t[1]<0 or t[1]>2: + raise ValueError('Invalid QoS level.') + if t[0] is None or len(t[0]) == 0 or not isinstance(t[0], str): + raise ValueError('Invalid topic.') + topic_qos_list.append((t[0].encode('utf-8'), t[1])) + + if topic_qos_list is None: + raise ValueError("No topic specified, or incorrect topic type.") + + if self._sock is None and self._ssl is None: + return (MQTT_ERR_NO_CONN, None) + + return self._send_subscribe(False, topic_qos_list) + + def unsubscribe(self, topic): + """Unsubscribe the client from one or more topics. + + topic: A single string, or list of strings that are the subscription + topics to unsubscribe from. + + Returns a tuple (result, mid), where result is MQTT_ERR_SUCCESS + to indicate success or (MQTT_ERR_NO_CONN, None) if the client is not + currently connected. + mid is the message ID for the unsubscribe request. The mid value can be + used to track the unsubscribe request by checking against the mid + argument in the on_unsubscribe() callback if it is defined. + + Raises a ValueError if topic is None or has zero string length, or is + not a string or list. + """ + topic_list = None + if topic is None: + raise ValueError('Invalid topic.') + if isinstance(topic, str): + if len(topic) == 0: + raise ValueError('Invalid topic.') + topic_list = [topic.encode('utf-8')] + elif isinstance(topic, list): + topic_list = [] + for t in topic: + if len(t) == 0 or not isinstance(t, str): + raise ValueError('Invalid topic.') + topic_list.append(t.encode('utf-8')) + + if topic_list is None: + raise ValueError("No topic specified, or incorrect topic type.") + + if self._sock is None and self._ssl is None: + return (MQTT_ERR_NO_CONN, None) + + return self._send_unsubscribe(False, topic_list) + + def loop_read(self, max_packets=1): + """Process read network events. Use in place of calling loop() if you + wish to handle your client reads as part of your own application. + + Use socket() to obtain the client socket to call select() or equivalent + on. + + Do not use if you are using the threaded interface loop_start().""" + if self._sock is None and self._ssl is None: + return MQTT_ERR_NO_CONN + + max_packets = len(self._out_messages) + len(self._in_messages) + if max_packets < 1: + max_packets = 1 + + for i in range(0, max_packets): + rc = self._packet_read() + if rc > 0: + return self._loop_rc_handle(rc) + elif rc == MQTT_ERR_AGAIN: + return MQTT_ERR_SUCCESS + return MQTT_ERR_SUCCESS + + def loop_write(self, max_packets=1): + """Process read network events. Use in place of calling loop() if you + wish to handle your client reads as part of your own application. + + Use socket() to obtain the client socket to call select() or equivalent + on. + + Use want_write() to determine if there is data waiting to be written. + + Do not use if you are using the threaded interface loop_start().""" + if self._sock is None and self._ssl is None: + return MQTT_ERR_NO_CONN + + max_packets = len(self._out_packet) + 1 + if max_packets < 1: + max_packets = 1 + + for i in range(0, max_packets): + rc = self._packet_write() + if rc > 0: + return self._loop_rc_handle(rc) + elif rc == MQTT_ERR_AGAIN: + return MQTT_ERR_SUCCESS + return MQTT_ERR_SUCCESS + + def want_write(self): + """Call to determine if there is network data waiting to be written. + Useful if you are calling select() yourself rather than using loop(). + """ + if self._current_out_packet or len(self._out_packet) > 0: + return True + else: + return False + + def loop_misc(self): + """Process miscellaneous network events. Use in place of calling loop() if you + wish to call select() or equivalent on. + + Do not use if you are using the threaded interface loop_start().""" + if self._sock is None and self._ssl is None: + return MQTT_ERR_NO_CONN + + now = time.time() + self._check_keepalive() + if self._last_retry_check+1 < now: + # Only check once a second at most + self._message_retry_check() + self._last_retry_check = now + + if self._ping_t > 0 and now - self._ping_t >= self._keepalive: + # client->ping_t != 0 means we are waiting for a pingresp. + # This hasn't happened in the keepalive time so we should disconnect. + if self._ssl: + self._ssl.close() + self._ssl = None + elif self._sock: + self._sock.close() + self._sock = None + + self._callback_mutex.acquire() + if self._state == mqtt_cs_disconnecting: + rc = MQTT_ERR_SUCCESS + else: + rc = 1 + if self.on_disconnect: + self._in_callback = True + self.on_disconnect(self, self._userdata, rc) + self._in_callback = False + self._callback_mutex.release() + return MQTT_ERR_CONN_LOST + + return MQTT_ERR_SUCCESS + + def max_inflight_messages_set(self, inflight): + """Set the maximum number of messages with QoS>0 that can be part way + through their network flow at once. Defaults to 20.""" + if inflight < 0: + raise ValueError('Invalid inflight.') + self._max_inflight_messages = inflight + + def message_retry_set(self, retry): + """Set the timeout in seconds before a message with QoS>0 is retried. + 20 seconds by default.""" + if retry < 0: + raise ValueError('Invalid retry.') + + self._message_retry = retry + + def user_data_set(self, userdata): + """Set the user data variable passed to callbacks. May be any data type.""" + self._userdata = userdata + + def will_set(self, topic, payload=None, qos=0, retain=False): + """Set a Will to be sent by the broker in case the client disconnects unexpectedly. + + This must be called before connect() to have any effect. + + topic: The topic that the will message should be published on. + payload: The message to send as a will. If not given, or set to None a + zero length message will be used as the will. Passing an int or float + will result in the payload being converted to a string representing + that number. If you wish to send a true int/float, use struct.pack() to + create the payload you require. + qos: The quality of service level to use for the will. + retain: If set to true, the will message will be set as the "last known + good"/retained message for the topic. + + Raises a ValueError if qos is not 0, 1 or 2, or if topic is None or has + zero string length. + """ + if topic is None or len(topic) == 0: + raise ValueError('Invalid topic.') + if qos<0 or qos>2: + raise ValueError('Invalid QoS level.') + if isinstance(payload, str): + self._will_payload = payload.encode('utf-8') + elif isinstance(payload, bytearray): + self._will_payload = payload + elif isinstance(payload, int) or isinstance(payload, float): + self._will_payload = str(payload) + elif payload is None: + self._will_payload = None + else: + raise TypeError('payload must be a string, bytearray, int, float or None.') + + self._will = True + self._will_topic = topic.encode('utf-8') + self._will_qos = qos + self._will_retain = retain + + def will_clear(self): + """ Removes a will that was previously configured with will_set(). + + Must be called before connect() to have any effect.""" + self._will = False + self._will_topic = "" + self._will_payload = None + self._will_qos = 0 + self._will_retain = False + + def socket(self): + """Return the socket or ssl object for this client.""" + if self._ssl: + return self._ssl + else: + return self._sock + + def loop_forever(self, timeout=1.0, max_packets=1, retry_first_connection=False): + """This function call loop() for you in an infinite blocking loop. It + is useful for the case where you only want to run the MQTT client loop + in your program. + + loop_forever() will handle reconnecting for you. If you call + disconnect() in a callback it will return. + + + timeout: The time in seconds to wait for incoming/outgoing network + traffic before timing out and returning. + max_packets: Not currently used. + retry_first_connection: Should the first connection attempt be retried on failure. + + Raises socket.error on first connection failures unless retry_first_connection=True + """ + + run = True + + while run: + if self._state == mqtt_cs_connect_async: + try: + self.reconnect() + except socket.error: + if not retry_first_connection: + raise + self._easy_log(MQTT_LOG_DEBUG, "Connection failed, retrying") + time.sleep(1) + else: + break + + while run: + rc = MQTT_ERR_SUCCESS + while rc == MQTT_ERR_SUCCESS: + rc = self.loop(timeout, max_packets) + # We don't need to worry about locking here, because we've + # either called loop_forever() when in single threaded mode, or + # in multi threaded mode when loop_stop() has been called and + # so no other threads can access _current_out_packet, + # _out_packet or _messages. + if (self._thread_terminate is True + and self._current_out_packet is None + and len(self._out_packet) == 0 + and len(self._out_messages) == 0): + + rc = 1 + run = False + + self._state_mutex.acquire() + if self._state == mqtt_cs_disconnecting or run is False or self._thread_terminate is True: + run = False + self._state_mutex.release() + else: + self._state_mutex.release() + time.sleep(1) + + self._state_mutex.acquire() + if self._state == mqtt_cs_disconnecting or run is False or self._thread_terminate is True: + run = False + self._state_mutex.release() + else: + self._state_mutex.release() + try: + self.reconnect() + except socket.error as err: + pass + + return rc + + def loop_start(self): + """This is part of the threaded client interface. Call this once to + start a new thread to process network traffic. This provides an + alternative to repeatedly calling loop() yourself. + """ + if self._thread is not None: + return MQTT_ERR_INVAL + + self._thread_terminate = False + self._thread = threading.Thread(target=self._thread_main) + self._thread.daemon = True + self._thread.start() + + def loop_stop(self, force=False): + """This is part of the threaded client interface. Call this once to + stop the network thread previously created with loop_start(). This call + will block until the network thread finishes. + + The force parameter is currently ignored. + """ + if self._thread is None: + return MQTT_ERR_INVAL + + self._thread_terminate = True + self._thread.join() + self._thread = None + + def message_callback_add(self, sub, callback): + """Register a message callback for a specific topic. + Messages that match 'sub' will be passed to 'callback'. Any + non-matching messages will be passed to the default on_message + callback. + + Call multiple times with different 'sub' to define multiple topic + specific callbacks. + + Topic specific callbacks may be removed with + message_callback_remove().""" + if callback is None or sub is None: + raise ValueError("sub and callback must both be defined.") + + self._callback_mutex.acquire() + + for i in range(0, len(self.on_message_filtered)): + if self.on_message_filtered[i][0] == sub: + self.on_message_filtered[i] = (sub, callback) + self._callback_mutex.release() + return + + self.on_message_filtered.append((sub, callback)) + self._callback_mutex.release() + + def message_callback_remove(self, sub): + """Remove a message callback previously registered with + message_callback_add().""" + if sub is None: + raise ValueError("sub must defined.") + + self._callback_mutex.acquire() + for i in range(0, len(self.on_message_filtered)): + if self.on_message_filtered[i][0] == sub: + self.on_message_filtered.pop(i) + self._callback_mutex.release() + return + self._callback_mutex.release() + + # ============================================================ + # Private functions + # ============================================================ + + def _loop_rc_handle(self, rc): + if rc: + if self._ssl: + self._ssl.close() + self._ssl = None + elif self._sock: + self._sock.close() + self._sock = None + + self._state_mutex.acquire() + if self._state == mqtt_cs_disconnecting: + rc = MQTT_ERR_SUCCESS + self._state_mutex.release() + self._callback_mutex.acquire() + if self.on_disconnect: + self._in_callback = True + self.on_disconnect(self, self._userdata, rc) + self._in_callback = False + + self._callback_mutex.release() + return rc + + def _packet_read(self): + # This gets called if pselect() indicates that there is network data + # available - ie. at least one byte. What we do depends on what data we + # already have. + # If we've not got a command, attempt to read one and save it. This should + # always work because it's only a single byte. + # Then try to read the remaining length. This may fail because it is may + # be more than one byte - will need to save data pending next read if it + # does fail. + # Then try to read the remaining payload, where 'payload' here means the + # combined variable header and actual payload. This is the most likely to + # fail due to longer length, so save current data and current position. + # After all data is read, send to _mqtt_handle_packet() to deal with. + # Finally, free the memory and reset everything to starting conditions. + if self._in_packet['command'] == 0: + try: + if self._ssl: + command = self._ssl.read(1) + else: + command = self._sock.recv(1) + except socket.error as err: + if self._ssl and (err.errno == ssl.SSL_ERROR_WANT_READ or err.errno == ssl.SSL_ERROR_WANT_WRITE): + return MQTT_ERR_AGAIN + if err.errno == EAGAIN: + return MQTT_ERR_AGAIN + print(err) + return 1 + else: + if len(command) == 0: + return 1 + command = struct.unpack("!B", command) + self._in_packet['command'] = command[0] + + if self._in_packet['have_remaining'] == 0: + # Read remaining + # Algorithm for decoding taken from pseudo code at + # http://publib.boulder.ibm.com/infocenter/wmbhelp/v6r0m0/topic/com.ibm.etools.mft.doc/ac10870_.htm + while True: + try: + if self._ssl: + byte = self._ssl.read(1) + else: + byte = self._sock.recv(1) + except socket.error as err: + if self._ssl and (err.errno == ssl.SSL_ERROR_WANT_READ or err.errno == ssl.SSL_ERROR_WANT_WRITE): + return MQTT_ERR_AGAIN + if err.errno == EAGAIN: + return MQTT_ERR_AGAIN + print(err) + return 1 + else: + byte = struct.unpack("!B", byte) + byte = byte[0] + self._in_packet['remaining_count'].append(byte) + # Max 4 bytes length for remaining length as defined by protocol. + # Anything more likely means a broken/malicious client. + if len(self._in_packet['remaining_count']) > 4: + return MQTT_ERR_PROTOCOL + + self._in_packet['remaining_length'] = self._in_packet['remaining_length'] + (byte & 127)*self._in_packet['remaining_mult'] + self._in_packet['remaining_mult'] = self._in_packet['remaining_mult'] * 128 + + if (byte & 128) == 0: + break + + self._in_packet['have_remaining'] = 1 + self._in_packet['to_process'] = self._in_packet['remaining_length'] + + while self._in_packet['to_process'] > 0: + try: + if self._ssl: + data = self._ssl.read(self._in_packet['to_process']) + else: + data = self._sock.recv(self._in_packet['to_process']) + except socket.error as err: + if self._ssl and (err.errno == ssl.SSL_ERROR_WANT_READ or err.errno == ssl.SSL_ERROR_WANT_WRITE): + return MQTT_ERR_AGAIN + if err.errno == EAGAIN: + return MQTT_ERR_AGAIN + print(err) + return 1 + else: + self._in_packet['to_process'] = self._in_packet['to_process'] - len(data) + self._in_packet['packet'] = self._in_packet['packet'] + data + + # All data for this packet is read. + self._in_packet['pos'] = 0 + rc = self._packet_handle() + + # Free data and reset values + self._in_packet = dict( + command=0, + have_remaining=0, + remaining_count=[], + remaining_mult=1, + remaining_length=0, + packet=b"", + to_process=0, + pos=0) + + self._msgtime_mutex.acquire() + self._last_msg_in = time.time() + self._msgtime_mutex.release() + return rc + + def _packet_write(self): + self._current_out_packet_mutex.acquire() + + while self._current_out_packet: + packet = self._current_out_packet + + try: + if self._ssl: + write_length = self._ssl.write(packet['packet'][packet['pos']:]) + else: + write_length = self._sock.send(packet['packet'][packet['pos']:]) + except AttributeError: + self._current_out_packet_mutex.release() + return MQTT_ERR_SUCCESS + except socket.error as err: + self._current_out_packet_mutex.release() + if self._ssl and (err.errno == ssl.SSL_ERROR_WANT_READ or err.errno == ssl.SSL_ERROR_WANT_WRITE): + return MQTT_ERR_AGAIN + if err.errno == EAGAIN: + return MQTT_ERR_AGAIN + print(err) + return 1 + + if write_length > 0: + packet['to_process'] = packet['to_process'] - write_length + packet['pos'] = packet['pos'] + write_length + + if packet['to_process'] == 0: + if (packet['command'] & 0xF0) == PUBLISH and packet['qos'] == 0: + self._callback_mutex.acquire() + if self.on_publish: + self._in_callback = True + self.on_publish(self, self._userdata, packet['mid']) + self._in_callback = False + + self._callback_mutex.release() + + if (packet['command'] & 0xF0) == DISCONNECT: + self._current_out_packet_mutex.release() + + self._msgtime_mutex.acquire() + self._last_msg_out = time.time() + self._msgtime_mutex.release() + + self._callback_mutex.acquire() + if self.on_disconnect: + self._in_callback = True + self.on_disconnect(self, self._userdata, 0) + self._in_callback = False + self._callback_mutex.release() + + if self._ssl: + self._ssl.close() + self._ssl = None + if self._sock: + self._sock.close() + self._sock = None + return MQTT_ERR_SUCCESS + + self._out_packet_mutex.acquire() + if len(self._out_packet) > 0: + self._current_out_packet = self._out_packet.pop(0) + else: + self._current_out_packet = None + self._out_packet_mutex.release() + else: + pass # FIXME + + self._current_out_packet_mutex.release() + + self._msgtime_mutex.acquire() + self._last_msg_out = time.time() + self._msgtime_mutex.release() + + return MQTT_ERR_SUCCESS + + def _easy_log(self, level, buf): + if self.on_log: + self.on_log(self, self._userdata, level, buf) + + def _check_keepalive(self): + now = time.time() + self._msgtime_mutex.acquire() + last_msg_out = self._last_msg_out + last_msg_in = self._last_msg_in + self._msgtime_mutex.release() + if (self._sock is not None or self._ssl is not None) and (now - last_msg_out >= self._keepalive or now - last_msg_in >= self._keepalive): + if self._state == mqtt_cs_connected and self._ping_t == 0: + self._send_pingreq() + self._msgtime_mutex.acquire() + self._last_msg_out = now + self._last_msg_in = now + self._msgtime_mutex.release() + else: + if self._ssl: + self._ssl.close() + self._ssl = None + elif self._sock: + self._sock.close() + self._sock = None + + if self._state == mqtt_cs_disconnecting: + rc = MQTT_ERR_SUCCESS + else: + rc = 1 + self._callback_mutex.acquire() + if self.on_disconnect: + self._in_callback = True + self.on_disconnect(self, self._userdata, rc) + self._in_callback = False + self._callback_mutex.release() + + def _mid_generate(self): + self._last_mid = self._last_mid + 1 + if self._last_mid == 65536: + self._last_mid = 1 + return self._last_mid + + def _topic_wildcard_len_check(self, topic): + # Search for + or # in a topic. Return MQTT_ERR_INVAL if found. + # Also returns MQTT_ERR_INVAL if the topic string is too long. + # Returns MQTT_ERR_SUCCESS if everything is fine. + if '+' in topic or '#' in topic or len(topic) == 0 or len(topic) > 65535: + return MQTT_ERR_INVAL + else: + return MQTT_ERR_SUCCESS + + def _send_pingreq(self): + self._easy_log(MQTT_LOG_DEBUG, "Sending PINGREQ") + rc = self._send_simple_command(PINGREQ) + if rc == MQTT_ERR_SUCCESS: + self._ping_t = time.time() + return rc + + def _send_pingresp(self): + self._easy_log(MQTT_LOG_DEBUG, "Sending PINGRESP") + return self._send_simple_command(PINGRESP) + + def _send_puback(self, mid): + self._easy_log(MQTT_LOG_DEBUG, "Sending PUBACK (Mid: "+str(mid)+")") + return self._send_command_with_mid(PUBACK, mid, False) + + def _send_pubcomp(self, mid): + self._easy_log(MQTT_LOG_DEBUG, "Sending PUBCOMP (Mid: "+str(mid)+")") + return self._send_command_with_mid(PUBCOMP, mid, False) + + def _pack_remaining_length(self, packet, remaining_length): + remaining_bytes = [] + while True: + byte = remaining_length % 128 + remaining_length = remaining_length // 128 + # If there are more digits to encode, set the top bit of this digit + if remaining_length > 0: + byte = byte | 0x80 + + remaining_bytes.append(byte) + packet.extend(struct.pack("!B", byte)) + if remaining_length == 0: + # FIXME - this doesn't deal with incorrectly large payloads + return packet + + def _pack_str16(self, packet, data): + if sys.version_info[0] < 3: + if isinstance(data, bytearray): + packet.extend(struct.pack("!H", len(data))) + packet.extend(data) + elif isinstance(data, str): + udata = data.encode('utf-8') + pack_format = "!H" + str(len(udata)) + "s" + packet.extend(struct.pack(pack_format, len(udata), udata)) + elif isinstance(data, unicode): + udata = data.encode('utf-8') + pack_format = "!H" + str(len(udata)) + "s" + packet.extend(struct.pack(pack_format, len(udata), udata)) + else: + raise TypeError + else: + if isinstance(data, bytearray) or isinstance(data, bytes): + packet.extend(struct.pack("!H", len(data))) + packet.extend(data) + elif isinstance(data, str): + udata = data.encode('utf-8') + pack_format = "!H" + str(len(udata)) + "s" + packet.extend(struct.pack(pack_format, len(udata), udata)) + else: + raise TypeError + + def _send_publish(self, mid, topic, payload=None, qos=0, retain=False, dup=False): + if self._sock is None and self._ssl is None: + return MQTT_ERR_NO_CONN + + utopic = topic.encode('utf-8') + command = PUBLISH | ((dup&0x1)<<3) | (qos<<1) | retain + packet = bytearray() + packet.extend(struct.pack("!B", command)) + if payload is None: + remaining_length = 2+len(utopic) + self._easy_log(MQTT_LOG_DEBUG, "Sending PUBLISH (d"+str(dup)+", q"+str(qos)+", r"+str(int(retain))+", m"+str(mid)+", '"+topic+"' (NULL payload)") + else: + if isinstance(payload, str): + upayload = payload.encode('utf-8') + payloadlen = len(upayload) + elif isinstance(payload, bytearray): + payloadlen = len(payload) + elif isinstance(payload, unicode): + upayload = payload.encode('utf-8') + payloadlen = len(upayload) + + remaining_length = 2+len(utopic) + payloadlen + self._easy_log(MQTT_LOG_DEBUG, "Sending PUBLISH (d"+str(dup)+", q"+str(qos)+", r"+str(int(retain))+", m"+str(mid)+", '"+topic+"', ... ("+str(payloadlen)+" bytes)") + + if qos > 0: + # For message id + remaining_length = remaining_length + 2 + + self._pack_remaining_length(packet, remaining_length) + self._pack_str16(packet, topic) + + if qos > 0: + # For message id + packet.extend(struct.pack("!H", mid)) + + if payload is not None: + if isinstance(payload, str): + pack_format = str(payloadlen) + "s" + packet.extend(struct.pack(pack_format, upayload)) + elif isinstance(payload, bytearray): + packet.extend(payload) + elif isinstance(payload, unicode): + pack_format = str(payloadlen) + "s" + packet.extend(struct.pack(pack_format, upayload)) + else: + raise TypeError('payload must be a string, unicode or a bytearray.') + + return self._packet_queue(PUBLISH, packet, mid, qos) + + def _send_pubrec(self, mid): + self._easy_log(MQTT_LOG_DEBUG, "Sending PUBREC (Mid: "+str(mid)+")") + return self._send_command_with_mid(PUBREC, mid, False) + + def _send_pubrel(self, mid, dup=False): + self._easy_log(MQTT_LOG_DEBUG, "Sending PUBREL (Mid: "+str(mid)+")") + return self._send_command_with_mid(PUBREL|2, mid, dup) + + def _send_command_with_mid(self, command, mid, dup): + # For PUBACK, PUBCOMP, PUBREC, and PUBREL + if dup: + command = command | 8 + + remaining_length = 2 + packet = struct.pack('!BBH', command, remaining_length, mid) + return self._packet_queue(command, packet, mid, 1) + + def _send_simple_command(self, command): + # For DISCONNECT, PINGREQ and PINGRESP + remaining_length = 0 + packet = struct.pack('!BB', command, remaining_length) + return self._packet_queue(command, packet, 0, 0) + + def _send_connect(self, keepalive, clean_session): + if self._protocol == MQTTv31: + protocol = PROTOCOL_NAMEv31 + proto_ver = 3 + else: + protocol = PROTOCOL_NAMEv311 + proto_ver = 4 + remaining_length = 2+len(protocol) + 1+1+2 + 2+len(self._client_id) + connect_flags = 0 + if clean_session: + connect_flags = connect_flags | 0x02 + + if self._will: + if self._will_payload is not None: + remaining_length = remaining_length + 2+len(self._will_topic) + 2+len(self._will_payload) + else: + remaining_length = remaining_length + 2+len(self._will_topic) + 2 + + connect_flags = connect_flags | 0x04 | ((self._will_qos&0x03) << 3) | ((self._will_retain&0x01) << 5) + + if self._username: + remaining_length = remaining_length + 2+len(self._username) + connect_flags = connect_flags | 0x80 + if self._password: + connect_flags = connect_flags | 0x40 + remaining_length = remaining_length + 2+len(self._password) + + command = CONNECT + packet = bytearray() + packet.extend(struct.pack("!B", command)) + + self._pack_remaining_length(packet, remaining_length) + packet.extend(struct.pack("!H"+str(len(protocol))+"sBBH", len(protocol), protocol, proto_ver, connect_flags, keepalive)) + + self._pack_str16(packet, self._client_id) + + if self._will: + self._pack_str16(packet, self._will_topic) + if self._will_payload is None or len(self._will_payload) == 0: + packet.extend(struct.pack("!H", 0)) + else: + self._pack_str16(packet, self._will_payload) + + if self._username: + self._pack_str16(packet, self._username) + + if self._password: + self._pack_str16(packet, self._password) + + self._keepalive = keepalive + return self._packet_queue(command, packet, 0, 0) + + def _send_disconnect(self): + return self._send_simple_command(DISCONNECT) + + def _send_subscribe(self, dup, topics): + remaining_length = 2 + for t in topics: + remaining_length = remaining_length + 2+len(t[0])+1 + + command = SUBSCRIBE | (dup<<3) | (1<<1) + packet = bytearray() + packet.extend(struct.pack("!B", command)) + self._pack_remaining_length(packet, remaining_length) + local_mid = self._mid_generate() + packet.extend(struct.pack("!H", local_mid)) + for t in topics: + self._pack_str16(packet, t[0]) + packet.extend(struct.pack("B", t[1])) + return (self._packet_queue(command, packet, local_mid, 1), local_mid) + + def _send_unsubscribe(self, dup, topics): + remaining_length = 2 + for t in topics: + remaining_length = remaining_length + 2+len(t) + + command = UNSUBSCRIBE | (dup<<3) | (1<<1) + packet = bytearray() + packet.extend(struct.pack("!B", command)) + self._pack_remaining_length(packet, remaining_length) + local_mid = self._mid_generate() + packet.extend(struct.pack("!H", local_mid)) + for t in topics: + self._pack_str16(packet, t) + return (self._packet_queue(command, packet, local_mid, 1), local_mid) + + def _message_retry_check_actual(self, messages, mutex): + mutex.acquire() + now = time.time() + for m in messages: + if m.timestamp + self._message_retry < now: + if m.state == mqtt_ms_wait_for_puback or m.state == mqtt_ms_wait_for_pubrec: + m.timestamp = now + m.dup = True + self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup) + elif m.state == mqtt_ms_wait_for_pubrel: + m.timestamp = now + m.dup = True + self._send_pubrec(m.mid) + elif m.state == mqtt_ms_wait_for_pubcomp: + m.timestamp = now + m.dup = True + self._send_pubrel(m.mid, True) + mutex.release() + + def _message_retry_check(self): + self._message_retry_check_actual(self._out_messages, self._out_message_mutex) + self._message_retry_check_actual(self._in_messages, self._in_message_mutex) + + def _messages_reconnect_reset_out(self): + self._out_message_mutex.acquire() + self._inflight_messages = 0 + for m in self._out_messages: + m.timestamp = 0 + if self._max_inflight_messages == 0 or self._inflight_messages < self._max_inflight_messages: + if m.qos == 0: + m.state = mqtt_ms_publish + elif m.qos == 1: + #self._inflight_messages = self._inflight_messages + 1 + if m.state == mqtt_ms_wait_for_puback: + m.dup = True + m.state = mqtt_ms_publish + elif m.qos == 2: + #self._inflight_messages = self._inflight_messages + 1 + if m.state == mqtt_ms_wait_for_pubcomp: + m.state = mqtt_ms_resend_pubrel + m.dup = True + else: + if m.state == mqtt_ms_wait_for_pubrec: + m.dup = True + m.state = mqtt_ms_publish + else: + m.state = mqtt_ms_queued + self._out_message_mutex.release() + + def _messages_reconnect_reset_in(self): + self._in_message_mutex.acquire() + for m in self._in_messages: + m.timestamp = 0 + if m.qos != 2: + self._in_messages.pop(self._in_messages.index(m)) + else: + # Preserve current state + pass + self._in_message_mutex.release() + + def _messages_reconnect_reset(self): + self._messages_reconnect_reset_out() + self._messages_reconnect_reset_in() + + def _packet_queue(self, command, packet, mid, qos): + mpkt = dict( + command = command, + mid = mid, + qos = qos, + pos = 0, + to_process = len(packet), + packet = packet) + + self._out_packet_mutex.acquire() + self._out_packet.append(mpkt) + if self._current_out_packet_mutex.acquire(False): + if self._current_out_packet is None and len(self._out_packet) > 0: + self._current_out_packet = self._out_packet.pop(0) + self._current_out_packet_mutex.release() + self._out_packet_mutex.release() + + # Write a single byte to sockpairW (connected to sockpairR) to break + # out of select() if in threaded mode. + try: + self._sockpairW.send(sockpair_data) + except socket.error as err: + if err.errno != EAGAIN: + raise + + if not self._in_callback and self._thread is None: + return self.loop_write() + else: + return MQTT_ERR_SUCCESS + + def _packet_handle(self): + cmd = self._in_packet['command']&0xF0 + if cmd == PINGREQ: + return self._handle_pingreq() + elif cmd == PINGRESP: + return self._handle_pingresp() + elif cmd == PUBACK: + return self._handle_pubackcomp("PUBACK") + elif cmd == PUBCOMP: + return self._handle_pubackcomp("PUBCOMP") + elif cmd == PUBLISH: + return self._handle_publish() + elif cmd == PUBREC: + return self._handle_pubrec() + elif cmd == PUBREL: + return self._handle_pubrel() + elif cmd == CONNACK: + return self._handle_connack() + elif cmd == SUBACK: + return self._handle_suback() + elif cmd == UNSUBACK: + return self._handle_unsuback() + else: + # If we don't recognise the command, return an error straight away. + self._easy_log(MQTT_LOG_ERR, "Error: Unrecognised command "+str(cmd)) + return MQTT_ERR_PROTOCOL + + def _handle_pingreq(self): + if self._strict_protocol: + if self._in_packet['remaining_length'] != 0: + return MQTT_ERR_PROTOCOL + + self._easy_log(MQTT_LOG_DEBUG, "Received PINGREQ") + return self._send_pingresp() + + def _handle_pingresp(self): + if self._strict_protocol: + if self._in_packet['remaining_length'] != 0: + return MQTT_ERR_PROTOCOL + + # No longer waiting for a PINGRESP. + self._ping_t = 0 + self._easy_log(MQTT_LOG_DEBUG, "Received PINGRESP") + return MQTT_ERR_SUCCESS + + def _handle_connack(self): + if self._strict_protocol: + if self._in_packet['remaining_length'] != 2: + return MQTT_ERR_PROTOCOL + + if len(self._in_packet['packet']) != 2: + return MQTT_ERR_PROTOCOL + + (flags, result) = struct.unpack("!BB", self._in_packet['packet']) + if result == CONNACK_REFUSED_PROTOCOL_VERSION and self._protocol == MQTTv311: + self._easy_log(MQTT_LOG_DEBUG, "Received CONNACK ("+str(flags)+", "+str(result)+"), attempting downgrade to MQTT v3.1.") + # Downgrade to MQTT v3.1 + self._protocol = MQTTv31 + return self.reconnect() + + if result == 0: + self._state = mqtt_cs_connected + + self._easy_log(MQTT_LOG_DEBUG, "Received CONNACK ("+str(flags)+", "+str(result)+")") + self._callback_mutex.acquire() + if self.on_connect: + self._in_callback = True + + if sys.version_info[0] < 3: + argcount = self.on_connect.func_code.co_argcount + else: + argcount = self.on_connect.__code__.co_argcount + + if argcount == 3: + self.on_connect(self, self._userdata, result) + else: + flags_dict = dict() + flags_dict['session present'] = flags & 0x01 + self.on_connect(self, self._userdata, flags_dict, result) + self._in_callback = False + self._callback_mutex.release() + if result == 0: + rc = 0 + self._out_message_mutex.acquire() + for m in self._out_messages: + m.timestamp = time.time() + if m.state == mqtt_ms_queued: + self.loop_write() # Process outgoing messages that have just been queued up + self._out_message_mutex.release() + return MQTT_ERR_SUCCESS + + if m.qos == 0: + self._in_callback = True # Don't call loop_write after _send_publish() + rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup) + self._in_callback = False + if rc != 0: + self._out_message_mutex.release() + return rc + elif m.qos == 1: + if m.state == mqtt_ms_publish: + self._inflight_messages = self._inflight_messages + 1 + m.state = mqtt_ms_wait_for_puback + self._in_callback = True # Don't call loop_write after _send_publish() + rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup) + self._in_callback = False + if rc != 0: + self._out_message_mutex.release() + return rc + elif m.qos == 2: + if m.state == mqtt_ms_publish: + self._inflight_messages = self._inflight_messages + 1 + m.state = mqtt_ms_wait_for_pubrec + self._in_callback = True # Don't call loop_write after _send_publish() + rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup) + self._in_callback = False + if rc != 0: + self._out_message_mutex.release() + return rc + elif m.state == mqtt_ms_resend_pubrel: + self._inflight_messages = self._inflight_messages + 1 + m.state = mqtt_ms_wait_for_pubcomp + self._in_callback = True # Don't call loop_write after _send_pubrel() + rc = self._send_pubrel(m.mid, m.dup) + self._in_callback = False + if rc != 0: + self._out_message_mutex.release() + return rc + self.loop_write() # Process outgoing messages that have just been queued up + self._out_message_mutex.release() + return rc + elif result > 0 and result < 6: + return MQTT_ERR_CONN_REFUSED + else: + return MQTT_ERR_PROTOCOL + + def _handle_suback(self): + self._easy_log(MQTT_LOG_DEBUG, "Received SUBACK") + pack_format = "!H" + str(len(self._in_packet['packet'])-2) + 's' + (mid, packet) = struct.unpack(pack_format, self._in_packet['packet']) + pack_format = "!" + "B"*len(packet) + granted_qos = struct.unpack(pack_format, packet) + + self._callback_mutex.acquire() + if self.on_subscribe: + self._in_callback = True + self.on_subscribe(self, self._userdata, mid, granted_qos) + self._in_callback = False + self._callback_mutex.release() + + return MQTT_ERR_SUCCESS + + def _handle_publish(self): + rc = 0 + + header = self._in_packet['command'] + message = MQTTMessage() + message.dup = (header & 0x08)>>3 + message.qos = (header & 0x06)>>1 + message.retain = (header & 0x01) + + pack_format = "!H" + str(len(self._in_packet['packet'])-2) + 's' + (slen, packet) = struct.unpack(pack_format, self._in_packet['packet']) + pack_format = '!' + str(slen) + 's' + str(len(packet)-slen) + 's' + (message.topic, packet) = struct.unpack(pack_format, packet) + + if len(message.topic) == 0: + return MQTT_ERR_PROTOCOL + + if sys.version_info[0] >= 3: + message.topic = message.topic.decode('utf-8') + + if message.qos > 0: + pack_format = "!H" + str(len(packet)-2) + 's' + (message.mid, packet) = struct.unpack(pack_format, packet) + + message.payload = packet + + self._easy_log( + MQTT_LOG_DEBUG, + "Received PUBLISH (d"+str(message.dup)+ + ", q"+str(message.qos)+", r"+str(message.retain)+ + ", m"+str(message.mid)+", '"+message.topic+ + "', ... ("+str(len(message.payload))+" bytes)") + + message.timestamp = time.time() + if message.qos == 0: + self._handle_on_message(message) + return MQTT_ERR_SUCCESS + elif message.qos == 1: + rc = self._send_puback(message.mid) + self._handle_on_message(message) + return rc + elif message.qos == 2: + rc = self._send_pubrec(message.mid) + message.state = mqtt_ms_wait_for_pubrel + self._in_message_mutex.acquire() + self._in_messages.append(message) + self._in_message_mutex.release() + return rc + else: + return MQTT_ERR_PROTOCOL + + def _handle_pubrel(self): + if self._strict_protocol: + if self._in_packet['remaining_length'] != 2: + return MQTT_ERR_PROTOCOL + + if len(self._in_packet['packet']) != 2: + return MQTT_ERR_PROTOCOL + + mid = struct.unpack("!H", self._in_packet['packet']) + mid = mid[0] + self._easy_log(MQTT_LOG_DEBUG, "Received PUBREL (Mid: "+str(mid)+")") + + self._in_message_mutex.acquire() + for i in range(len(self._in_messages)): + if self._in_messages[i].mid == mid: + + # Only pass the message on if we have removed it from the queue - this + # prevents multiple callbacks for the same message. + self._handle_on_message(self._in_messages[i]) + self._in_messages.pop(i) + self._inflight_messages = self._inflight_messages - 1 + if self._max_inflight_messages > 0: + self._out_message_mutex.acquire() + rc = self._update_inflight() + self._out_message_mutex.release() + if rc != MQTT_ERR_SUCCESS: + self._in_message_mutex.release() + return rc + + self._in_message_mutex.release() + return self._send_pubcomp(mid) + + self._in_message_mutex.release() + return MQTT_ERR_SUCCESS + + def _update_inflight(self): + # Dont lock message_mutex here + for m in self._out_messages: + if self._inflight_messages < self._max_inflight_messages: + if m.qos > 0 and m.state == mqtt_ms_queued: + self._inflight_messages = self._inflight_messages + 1 + if m.qos == 1: + m.state = mqtt_ms_wait_for_puback + elif m.qos == 2: + m.state = mqtt_ms_wait_for_pubrec + rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup) + if rc != 0: + return rc + else: + return MQTT_ERR_SUCCESS + return MQTT_ERR_SUCCESS + + def _handle_pubrec(self): + if self._strict_protocol: + if self._in_packet['remaining_length'] != 2: + return MQTT_ERR_PROTOCOL + + mid = struct.unpack("!H", self._in_packet['packet']) + mid = mid[0] + self._easy_log(MQTT_LOG_DEBUG, "Received PUBREC (Mid: "+str(mid)+")") + + self._out_message_mutex.acquire() + for m in self._out_messages: + if m.mid == mid: + m.state = mqtt_ms_wait_for_pubcomp + m.timestamp = time.time() + self._out_message_mutex.release() + return self._send_pubrel(mid, False) + + self._out_message_mutex.release() + return MQTT_ERR_SUCCESS + + def _handle_unsuback(self): + if self._strict_protocol: + if self._in_packet['remaining_length'] != 2: + return MQTT_ERR_PROTOCOL + + mid = struct.unpack("!H", self._in_packet['packet']) + mid = mid[0] + self._easy_log(MQTT_LOG_DEBUG, "Received UNSUBACK (Mid: "+str(mid)+")") + self._callback_mutex.acquire() + if self.on_unsubscribe: + self._in_callback = True + self.on_unsubscribe(self, self._userdata, mid) + self._in_callback = False + self._callback_mutex.release() + return MQTT_ERR_SUCCESS + + def _handle_pubackcomp(self, cmd): + if self._strict_protocol: + if self._in_packet['remaining_length'] != 2: + return MQTT_ERR_PROTOCOL + + mid = struct.unpack("!H", self._in_packet['packet']) + mid = mid[0] + self._easy_log(MQTT_LOG_DEBUG, "Received "+cmd+" (Mid: "+str(mid)+")") + + self._out_message_mutex.acquire() + for i in range(len(self._out_messages)): + try: + if self._out_messages[i].mid == mid: + # Only inform the client the message has been sent once. + self._callback_mutex.acquire() + if self.on_publish: + self._out_message_mutex.release() + self._in_callback = True + self.on_publish(self, self._userdata, mid) + self._in_callback = False + self._out_message_mutex.acquire() + + self._callback_mutex.release() + self._out_messages.pop(i) + self._inflight_messages = self._inflight_messages - 1 + if self._max_inflight_messages > 0: + rc = self._update_inflight() + if rc != MQTT_ERR_SUCCESS: + self._out_message_mutex.release() + return rc + self._out_message_mutex.release() + return MQTT_ERR_SUCCESS + except IndexError: + # Have removed item so i>count. + # Not really an error. + pass + + self._out_message_mutex.release() + return MQTT_ERR_SUCCESS + + def _handle_on_message(self, message): + self._callback_mutex.acquire() + matched = False + for t in self.on_message_filtered: + if topic_matches_sub(t[0], message.topic): + self._in_callback = True + t[1](self, self._userdata, message) + self._in_callback = False + matched = True + + if matched == False and self.on_message: + self._in_callback = True + self.on_message(self, self._userdata, message) + self._in_callback = False + + self._callback_mutex.release() + + def _thread_main(self): + self._state_mutex.acquire() + if self._state == mqtt_cs_connect_async: + self._state_mutex.release() + self.reconnect() + else: + self._state_mutex.release() + + self.loop_forever() + + def _host_matches_cert(self, host, cert_host): + if cert_host[0:2] == "*.": + if cert_host.count("*") != 1: + return False + + host_match = host.split(".", 1)[1] + cert_match = cert_host.split(".", 1)[1] + if host_match == cert_match: + return True + else: + return False + else: + if host == cert_host: + return True + else: + return False + + def _tls_match_hostname(self): + cert = self._ssl.getpeercert() + san = cert.get('subjectAltName') + if san: + have_san_dns = False + for (key, value) in san: + if key == 'DNS': + have_san_dns = True + if self._host_matches_cert(self._host.lower(), value.lower()) == True: + return + if key == 'IP Address': + have_san_dns = True + if value.lower() == self._host.lower(): + return + + if have_san_dns: + # Only check subject if subjectAltName dns not found. + raise ssl.SSLError('Certificate subject does not match remote hostname.') + subject = cert.get('subject') + if subject: + for ((key, value),) in subject: + if key == 'commonName': + if self._host_matches_cert(self._host.lower(), value.lower()) == True: + return + + raise ssl.SSLError('Certificate subject does not match remote hostname.') + + +# Compatibility class for easy porting from mosquitto.py. +class Mosquitto(Client): + def __init__(self, client_id="", clean_session=True, userdata=None): + super(Mosquitto, self).__init__(client_id, clean_session, userdata) diff --git a/lib/paho/mqtt/publish.py b/lib/paho/mqtt/publish.py new file mode 100644 index 00000000..b468dda8 --- /dev/null +++ b/lib/paho/mqtt/publish.py @@ -0,0 +1,217 @@ +# Copyright (c) 2014 Roger Light +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Eclipse Public License v1.0 +# and Eclipse Distribution License v1.0 which accompany this distribution. +# +# The Eclipse Public License is available at +# http://www.eclipse.org/legal/epl-v10.html +# and the Eclipse Distribution License is available at +# http://www.eclipse.org/org/documents/edl-v10.php. +# +# Contributors: +# Roger Light - initial API and implementation + +""" +This module provides some helper functions to allow straightforward publishing +of messages in a one-shot manner. In other words, they are useful for the +situation where you have a single/multiple messages you want to publish to a +broker, then disconnect and nothing else is required. +""" + +import paho.mqtt.client as mqtt + + +def _do_publish(c): + """Internal function""" + m = c._userdata[0] + c._userdata = c._userdata[1:] + if type(m) is dict: + topic = m['topic'] + try: + payload = m['payload'] + except KeyError: + payload = None + try: + qos = m['qos'] + except KeyError: + qos = 0 + try: + retain = m['retain'] + except KeyError: + retain = False + elif type(m) is tuple: + (topic, payload, qos, retain) = m + else: + raise ValueError('message must be a dict or a tuple') + + c.publish(topic, payload, qos, retain) + + +def _on_connect(c, userdata, flags, rc): + """Internal callback""" + _do_publish(c) + + +def _on_publish(c, userdata, mid): + """Internal callback""" + if len(userdata) == 0: + c.disconnect() + else: + _do_publish(c) + + +def multiple(msgs, hostname="localhost", port=1883, client_id="", keepalive=60, + will=None, auth=None, tls=None, protocol=mqtt.MQTTv31): + """Publish multiple messages to a broker, then disconnect cleanly. + + This function creates an MQTT client, connects to a broker and publishes a + list of messages. Once the messages have been delivered, it disconnects + cleanly from the broker. + + msgs : a list of messages to publish. Each message is either a dict or a + tuple. + + If a dict, only the topic must be present. Default values will be + used for any missing arguments. The dict must be of the form: + + msg = {'topic':"", 'payload':"", 'qos':, + 'retain':} + topic must be present and may not be empty. + If payload is "", None or not present then a zero length payload + will be published. + If qos is not present, the default of 0 is used. + If retain is not present, the default of False is used. + + If a tuple, then it must be of the form: + ("", "", qos, retain) + hostname : a string containing the address of the broker to connect to. + Defaults to localhost. + port : the port to connect to the broker on. Defaults to 1883. + client_id : the MQTT client id to use. If "" or None, the Paho library will + generate a client id automatically. + keepalive : the keepalive timeout value for the client. Defaults to 60 + seconds. + will : a dict containing will parameters for the client: will = {'topic': + "", 'payload':", 'qos':, 'retain':}. + Topic is required, all other parameters are optional and will + default to None, 0 and False respectively. + Defaults to None, which indicates no will should be used. + auth : a dict containing authentication parameters for the client: + auth = {'username':"", 'password':""} + Username is required, password is optional and will default to None + if not provided. + Defaults to None, which indicates no authentication is to be used. + tls : a dict containing TLS configuration parameters for the client: + dict = {'ca_certs':"", 'certfile':"", + 'keyfile':"", 'tls_version':"", + 'ciphers':"} + ca_certs is required, all other parameters are optional and will + default to None if not provided, which results in the client using + the default behaviour - see the paho.mqtt.client documentation. + Defaults to None, which indicates that TLS should not be used. + """ + + if type(msgs) is not list: + raise ValueError('msgs must be a list') + + client = mqtt.Client(client_id=client_id, + userdata=msgs, protocol=protocol) + client.on_publish = _on_publish + client.on_connect = _on_connect + + if auth is not None: + username = auth['username'] + try: + password = auth['password'] + except KeyError: + password = None + client.username_pw_set(username, password) + + if will is not None: + will_topic = will['topic'] + try: + will_payload = will['payload'] + except KeyError: + will_payload = None + try: + will_qos = will['qos'] + except KeyError: + will_qos = 0 + try: + will_retain = will['retain'] + except KeyError: + will_retain = False + + client.will_set(will_topic, will_payload, will_qos, will_retain) + + if tls is not None: + ca_certs = tls['ca_certs'] + try: + certfile = tls['certfile'] + except KeyError: + certfile = None + try: + keyfile = tls['keyfile'] + except KeyError: + keyfile = None + try: + tls_version = tls['tls_version'] + except KeyError: + tls_version = None + try: + ciphers = tls['ciphers'] + except KeyError: + ciphers = None + client.tls_set(ca_certs, certfile, keyfile, tls_version=tls_version, + ciphers=ciphers) + + client.connect(hostname, port, keepalive) + client.loop_forever() + + +def single(topic, payload=None, qos=0, retain=False, hostname="localhost", + port=1883, client_id="", keepalive=60, will=None, auth=None, + tls=None, protocol=mqtt.MQTTv31): + """Publish a single message to a broker, then disconnect cleanly. + + This function creates an MQTT client, connects to a broker and publishes a + single message. Once the message has been delivered, it disconnects cleanly + from the broker. + + topic : the only required argument must be the topic string to which the + payload will be published. + payload : the payload to be published. If "" or None, a zero length payload + will be published. + qos : the qos to use when publishing, default to 0. + retain : set the message to be retained (True) or not (False). + hostname : a string containing the address of the broker to connect to. + Defaults to localhost. + port : the port to connect to the broker on. Defaults to 1883. + client_id : the MQTT client id to use. If "" or None, the Paho library will + generate a client id automatically. + keepalive : the keepalive timeout value for the client. Defaults to 60 + seconds. + will : a dict containing will parameters for the client: will = {'topic': + "", 'payload':", 'qos':, 'retain':}. + Topic is required, all other parameters are optional and will + default to None, 0 and False respectively. + Defaults to None, which indicates no will should be used. + auth : a dict containing authentication parameters for the client: + auth = {'username':"", 'password':""} + Username is required, password is optional and will default to None + if not provided. + Defaults to None, which indicates no authentication is to be used. + tls : a dict containing TLS configuration parameters for the client: + dict = {'ca_certs':"", 'certfile':"", + 'keyfile':"", 'tls_version':"", + 'ciphers':"} + ca_certs is required, all other parameters are optional and will + default to None if not provided, which results in the client using + the default behaviour - see the paho.mqtt.client documentation. + Defaults to None, which indicates that TLS should not be used. + """ + + msg = {'topic':topic, 'payload':payload, 'qos':qos, 'retain':retain} + multiple([msg], hostname, port, client_id, keepalive, will, auth, tls, protocol) + diff --git a/plexpy/notifiers.py b/plexpy/notifiers.py index 0ad5fa38..fdcc9ce2 100644 --- a/plexpy/notifiers.py +++ b/plexpy/notifiers.py @@ -19,6 +19,7 @@ import json from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText import email.utils +from paho.mqtt.publish import single import os import re import requests @@ -84,7 +85,8 @@ AGENT_IDS = {'growl': 0, 'hipchat': 19, 'discord': 20, 'androidapp': 21, - 'groupme': 22 + 'groupme': 22, + 'mqtt': 23 } @@ -137,6 +139,10 @@ def available_notification_agents(): 'name': 'nma', 'id': AGENT_IDS['nma'] }, + {'label': 'MQTT', + 'name': 'mqtt', + 'id': AGENT_IDS['mqtt'] + }, {'label': 'Plex Home Theater', 'name': 'plex', 'id': AGENT_IDS['plex'] @@ -365,6 +371,8 @@ def get_agent_class(agent_id=None, config=None): return ANDROIDAPP(config=config) elif agent_id == 22: return GROUPME(config=config) + elif agent_id == 23: + return MQTT(config=config) else: return Notifier(config=config) else: @@ -2022,6 +2030,119 @@ class JOIN(Notifier): return config_option +class MQTT(Notifier): + """ + MQTT notifications + """ + _DEFAULT_CONFIG = {'broker': '', + 'port': 1883, + 'protocol': 'MQTTv311', + 'username': '', + 'password': '', + 'client_id': 'plexpy', + 'topic': '', + 'qos': 1, + 'retain': 0, + 'keep_alive': 60 + } + + def notify(self, subject='', body='', action='', **kwargs): + if not subject or not body: + return + + if not self.config['topic']: + logger.error(u"PlexPy Notifiers :: MQTT topic not specified.") + return + + data = {'subject': subject.encode("utf-8"), + 'body': body.encode("utf-8"), + 'topic': self.config['topic'].encode("utf-8")} + + auth = {} + if self.config['username']: + auth['username'] = self.config['username'] + if self.config['password']: + auth['password'] = self.config['password'] + + single(self.config['topic'], payload=json.dumps(data), qos=self.config['qos'], retain=bool(self.config['retain']), + hostname=self.config['broker'], port=self.config['port'], client_id=self.config['client_id'], + keepalive=self.config['keep_alive'], auth=auth or None, protocol=self.config['protocol']) + + return True + + def return_config_options(self): + config_option = [{'label': 'Broker', + 'value': self.config['broker'], + 'name': 'mqtt_broker', + 'description': 'The hostname or IP address of the MQTT broker.', + 'input_type': 'text' + }, + {'label': 'Port', + 'value': self.config['port'], + 'name': 'mqtt_port', + 'description': 'The network port for connecting to the MQTT broker.', + 'input_type': 'number' + }, + {'label': 'Protocol', + 'value': self.config['protocol'], + 'name': 'mqtt_protocol', + 'description': 'The MQTT protocol version.', + 'input_type': 'select', + 'select_options': {'MQTTv31': '3.1', + 'MQTTv311': '3.1.1' + } + }, + {'label': 'Client ID', + 'value': self.config['client_id'], + 'name': 'mqtt_client_id', + 'description': 'The client ID for connecting to the MQTT broker.', + 'input_type': 'text' + }, + {'label': 'Username', + 'value': self.config['username'], + 'name': 'mqtt_username', + 'description': 'The username to authenticate with the MQTT broker.', + 'input_type': 'text' + }, + {'label': 'Password', + 'value': self.config['password'], + 'name': 'mqtt_password', + 'description': 'The password to authenticate with the MQTT broker.', + 'input_type': 'password' + }, + {'label': 'Topic', + 'value': self.config['topic'], + 'name': 'mqtt_topic', + 'description': 'The topic to publish notifications to.', + 'input_type': 'text' + }, + {'label': 'Quality of Service', + 'value': self.config['qos'], + 'name': 'mqtt_qos', + 'description': 'The quality of service level to use when publishing the notification.', + 'input_type': 'select', + 'select_options': {0: 0, + 1: 1, + 2: 2 + } + }, + {'label': 'Retain Message', + 'value': self.config['retain'], + 'name': 'mqtt_retain', + 'description': 'Set the message to be retained on the MQTT broker.', + 'input_type': 'checkbox' + }, + {'label': 'Keep-Alive', + 'value': self.config['keep_alive'], + 'name': 'mqtt_keep_alive', + 'description': 'Maximum period in seconds before timing out the connection with the broker.', + 'input_type': 'number' + } + ] + + return config_option + + class NMA(Notifier): """ Notify My Android notifications @@ -3359,3 +3480,4 @@ def upgrade_config_to_db(): # Add a new notifier and update the config notifier_id = add_notifier_config(agent_id=agent_id) set_notifier_config(notifier_id=notifier_id, agent_id=agent_id, **notifier_config) +