From 41ef595edc4554082d77a4163ef0bbcd96c965aa Mon Sep 17 00:00:00 2001 From: JonnyWong16 <9099342+JonnyWong16@users.noreply.github.com> Date: Fri, 15 Oct 2021 00:08:51 -0700 Subject: [PATCH] Update xmltodict-0.12.0 --- lib/xmltodict.py | 183 +++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 153 insertions(+), 30 deletions(-) diff --git a/lib/xmltodict.py b/lib/xmltodict.py index 746a4bcd..d6dbcd7a 100644 --- a/lib/xmltodict.py +++ b/lib/xmltodict.py @@ -1,7 +1,10 @@ #!/usr/bin/env python "Makes working with XML feel like you are working with JSON" -from xml.parsers import expat +try: + from defusedexpat import pyexpat as expat +except ImportError: + from xml.parsers import expat from xml.sax.saxutils import XMLGenerator from xml.sax.xmlreader import AttributesImpl try: # pragma no cover @@ -11,13 +14,8 @@ except ImportError: # pragma no cover from StringIO import StringIO except ImportError: from io import StringIO -try: # pragma no cover - from collections import OrderedDict -except ImportError: # pragma no cover - try: - from ordereddict import OrderedDict - except ImportError: - OrderedDict = dict + +from collections import OrderedDict try: # pragma no cover _basestring = basestring @@ -29,7 +27,7 @@ except NameError: # pragma no cover _unicode = str __author__ = 'Martin Blech' -__version__ = '0.9.2' +__version__ = '0.12.0' __license__ = 'MIT' @@ -50,10 +48,11 @@ class _DictSAXHandler(object): dict_constructor=OrderedDict, strip_whitespace=True, namespace_separator=':', - namespaces=None): + namespaces=None, + force_list=None): self.path = [] self.stack = [] - self.data = None + self.data = [] self.item = None self.item_depth = item_depth self.xml_attribs = xml_attribs @@ -67,6 +66,8 @@ class _DictSAXHandler(object): self.strip_whitespace = strip_whitespace self.namespace_separator = namespace_separator self.namespaces = namespaces + self.namespace_declarations = OrderedDict() + self.force_list = force_list def _build_name(self, full_name): if not self.namespaces: @@ -86,34 +87,51 @@ class _DictSAXHandler(object): return attrs return self.dict_constructor(zip(attrs[0::2], attrs[1::2])) + def startNamespaceDecl(self, prefix, uri): + self.namespace_declarations[prefix or ''] = uri + def startElement(self, full_name, attrs): name = self._build_name(full_name) attrs = self._attrs_to_dict(attrs) + if attrs and self.namespace_declarations: + attrs['xmlns'] = self.namespace_declarations + self.namespace_declarations = OrderedDict() self.path.append((name, attrs or None)) if len(self.path) > self.item_depth: self.stack.append((self.item, self.data)) if self.xml_attribs: - attrs = self.dict_constructor( - (self.attr_prefix+self._build_name(key), value) - for (key, value) in attrs.items()) + attr_entries = [] + for key, value in attrs.items(): + key = self.attr_prefix+self._build_name(key) + if self.postprocessor: + entry = self.postprocessor(self.path, key, value) + else: + entry = (key, value) + if entry: + attr_entries.append(entry) + attrs = self.dict_constructor(attr_entries) else: attrs = None self.item = attrs or None - self.data = None + self.data = [] def endElement(self, full_name): name = self._build_name(full_name) if len(self.path) == self.item_depth: item = self.item if item is None: - item = self.data + item = (None if not self.data + else self.cdata_separator.join(self.data)) + should_continue = self.item_callback(self.path, item) if not should_continue: raise ParsingInterrupted() if len(self.stack): - item, data = self.item, self.data + data = (None if not self.data + else self.cdata_separator.join(self.data)) + item = self.item self.item, self.data = self.stack.pop() - if self.strip_whitespace and data is not None: + if self.strip_whitespace and data: data = data.strip() or None if data and self.force_cdata and item is None: item = self.dict_constructor() @@ -124,14 +142,15 @@ class _DictSAXHandler(object): else: self.item = self.push_data(self.item, name, data) else: - self.item = self.data = None + self.item = None + self.data = [] self.path.pop() def characters(self, data): if not self.data: - self.data = data + self.data = [data] else: - self.data += self.cdata_separator + data + self.data.append(data) def push_data(self, item, key, data): if self.postprocessor is not None: @@ -148,12 +167,25 @@ class _DictSAXHandler(object): else: item[key] = [value, data] except KeyError: - item[key] = data + if self._should_force_list(key, data): + item[key] = [data] + else: + item[key] = data return item + def _should_force_list(self, key, value): + if not self.force_list: + return False + if isinstance(self.force_list, bool): + return self.force_list + try: + return key in self.force_list + except TypeError: + return self.force_list(self.path[:-1], key, value) + def parse(xml_input, encoding=None, expat=expat, process_namespaces=False, - namespace_separator=':', **kwargs): + namespace_separator=':', disable_entities=True, **kwargs): """Parse the given XML input and convert it into a dictionary. `xml_input` can either be a `string` or a file-like object. @@ -189,7 +221,7 @@ def parse(xml_input, encoding=None, expat=expat, process_namespaces=False, Streaming example:: >>> def handle(path, item): - ... print 'path:%s item:%s' % (path, item) + ... print('path:%s item:%s' % (path, item)) ... return True ... >>> xmltodict.parse(\"\"\" @@ -220,6 +252,41 @@ def parse(xml_input, encoding=None, expat=expat, process_namespaces=False, >>> xmltodict.parse('hello', expat=defusedexpat.pyexpat) OrderedDict([(u'a', u'hello')]) + You can use the force_list argument to force lists to be created even + when there is only a single child of a given level of hierarchy. The + force_list argument is a tuple of keys. If the key for a given level + of hierarchy is in the force_list argument, that level of hierarchy + will have a list as a child (even if there is only one sub-element). + The index_keys operation takes precendence over this. This is applied + after any user-supplied postprocessor has already run. + + For example, given this input: + + + host1 + Linux + + + em0 + 10.0.0.1 + + + + + + If called with force_list=('interface',), it will produce + this dictionary: + {'servers': + {'server': + {'name': 'host1', + 'os': 'Linux'}, + 'interfaces': + {'interface': + [ {'name': 'em0', 'ip_address': '10.0.0.1' } ] } } } + + `force_list` can also be a callable that receives `path`, `key` and + `value`. This is helpful in cases where the logic that decides whether + a list should be forced is more complex. """ handler = _DictSAXHandler(namespace_separator=namespace_separator, **kwargs) @@ -238,17 +305,44 @@ def parse(xml_input, encoding=None, expat=expat, process_namespaces=False, except AttributeError: # Jython's expat does not support ordered_attributes pass + parser.StartNamespaceDeclHandler = handler.startNamespaceDecl parser.StartElementHandler = handler.startElement parser.EndElementHandler = handler.endElement parser.CharacterDataHandler = handler.characters parser.buffer_text = True - try: + if disable_entities: + try: + # Attempt to disable DTD in Jython's expat parser (Xerces-J). + feature = "http://apache.org/xml/features/disallow-doctype-decl" + parser._reader.setFeature(feature, True) + except AttributeError: + # For CPython / expat parser. + # Anything not handled ends up here and entities aren't expanded. + parser.DefaultHandler = lambda x: None + # Expects an integer return; zero means failure -> expat.ExpatError. + parser.ExternalEntityRefHandler = lambda *x: 1 + if hasattr(xml_input, 'read'): parser.ParseFile(xml_input) - except (TypeError, AttributeError): + else: parser.Parse(xml_input, True) return handler.item +def _process_namespace(name, namespaces, ns_sep=':', attr_prefix='@'): + if not namespaces: + return name + try: + ns, name = name.rsplit(ns_sep, 1) + except ValueError: + pass + else: + ns_res = namespaces.get(ns.strip(attr_prefix)) + name = '{}{}{}{}'.format( + attr_prefix if ns.startswith(attr_prefix) else '', + ns_res, ns_sep, name) if ns_res else name + return name + + def _emit(key, value, content_handler, attr_prefix='@', cdata_key='#text', @@ -257,7 +351,10 @@ def _emit(key, value, content_handler, pretty=False, newl='\n', indent='\t', + namespace_separator=':', + namespaces=None, full_document=True): + key = _process_namespace(key, namespaces, namespace_separator, attr_prefix) if preprocessor is not None: result = preprocessor(key, value) if result is None: @@ -272,6 +369,11 @@ def _emit(key, value, content_handler, raise ValueError('document with multiple roots') if v is None: v = OrderedDict() + elif isinstance(v, bool): + if v: + v = _unicode('true') + else: + v = _unicode('false') elif not isinstance(v, dict): v = _unicode(v) if isinstance(v, _basestring): @@ -284,6 +386,15 @@ def _emit(key, value, content_handler, cdata = iv continue if ik.startswith(attr_prefix): + ik = _process_namespace(ik, namespaces, namespace_separator, + attr_prefix) + if ik == '@xmlns' and isinstance(iv, dict): + for k, v in iv.items(): + attr = 'xmlns{}'.format(':{}'.format(k) if k else '') + attrs[attr] = _unicode(v) + continue + if not isinstance(iv, _unicode): + iv = _unicode(iv) attrs[ik[len(attr_prefix):]] = iv continue children.append((ik, iv)) @@ -295,7 +406,8 @@ def _emit(key, value, content_handler, for child_key, child_value in children: _emit(child_key, child_value, content_handler, attr_prefix, cdata_key, depth+1, preprocessor, - pretty, newl, indent) + pretty, newl, indent, namespaces=namespaces, + namespace_separator=namespace_separator) if cdata is not None: content_handler.characters(cdata) if pretty and children: @@ -306,6 +418,7 @@ def _emit(key, value, content_handler, def unparse(input_dict, output=None, encoding='utf-8', full_document=True, + short_empty_elements=False, **kwargs): """Emit an XML document for the given `input_dict` (reverse of `parse`). @@ -327,7 +440,10 @@ def unparse(input_dict, output=None, encoding='utf-8', full_document=True, if output is None: output = StringIO() must_return = True - content_handler = XMLGenerator(output, encoding) + if short_empty_elements: + content_handler = XMLGenerator(output, encoding, True) + else: + content_handler = XMLGenerator(output, encoding) if full_document: content_handler.startDocument() for key, value in input_dict.items(): @@ -343,19 +459,26 @@ def unparse(input_dict, output=None, encoding='utf-8', full_document=True, pass return value + if __name__ == '__main__': # pragma: no cover import sys import marshal + try: + stdin = sys.stdin.buffer + stdout = sys.stdout.buffer + except AttributeError: + stdin = sys.stdin + stdout = sys.stdout (item_depth,) = sys.argv[1:] item_depth = int(item_depth) def handle_item(path, item): - marshal.dump((path, item), sys.stdout) + marshal.dump((path, item), stdout) return True try: - root = parse(sys.stdin, + root = parse(stdin, item_depth=item_depth, item_callback=handle_item, dict_constructor=dict)