Source code for inxs

# TODO annotate Callable types with signature and return values
# https://docs.python.org/3.6/library/typing.html?highlight=namedtuple#callable
# TODO delete unneeded symbols in setup functions' locals
# TODO globbing is much less stressing than regular expressions

import logging
import pkg_resources
from collections import ChainMap
from copy import deepcopy
from functools import lru_cache
from os import getenv
from types import SimpleNamespace
from typing import (
    AnyStr,
    Callable,
    Dict,
    Iterator,
    List,
    Mapping,
    Pattern,
    Sequence,
    Union,
)
from typing import Any as AnyType

import cssselect
import dependency_injection
from delb import is_tag_node, TagNode, Document

from inxs.constants import (
    REF_IDENTIFYING_ATTRIBUTE,
    TRAVERSE_BOTTOM_TO_TOP,
    TRAVERSE_DEPTH_FIRST,
    TRAVERSE_LEFT_TO_RIGHT,
    TRAVERSE_RIGHT_TO_LEFT,
    TRAVERSE_ROOT_ONLY,
    TRAVERSE_TOP_TO_BOTTOM,
    TRAVERSE_WIDTH_FIRST,
)


# config


__version__ = pkg_resources.get_distribution("inxs").version

HANDLER_CACHES_SIZE = getenv("INXS_HANDLER_CACHE_SIZE", None)
if HANDLER_CACHES_SIZE is not None:
    HANDLER_CACHES_SIZE = int(HANDLER_CACHES_SIZE)


# logging


logger = logging.getLogger(__name__)
""" Module logger, configure as you need. """
dbg = logger.debug
nfo = logger.info


# exceptions


[docs]class InxsException(Exception): """ Base class for inxs exceptions. """
class FlowControl(InxsException): """ Base class for exception that control the evaluation of handlers. """ def __init__(self): super().__init__() dbg(f"{self.__class__.__name__} is evoked.")
[docs]class AbortRule(FlowControl): """ Can be raised to abort the evaluation of all the currently processed :class:`inxs.Rule` 's remaining tests and handlers. No further nodes will be considered for that rule. This is similar to Python's builtin ``break`` in iterations. """
[docs]class AbortTransformation(FlowControl): """ Can be raised to cancel the remaining :term:`transformation steps`. """
[docs]class SkipToNextNode(FlowControl): """ Can be raised to abort handling of the current node. This is similar to Python's builtin ``continue`` in iterations. """
# types AttributesConditionType = Union[ Dict[Union[str, Pattern], Union[str, Pattern, None]], Callable ] ConditionType = Union[Callable, AnyStr, AttributesConditionType] StepType = Union["Rule", Callable, Sequence["StepType"]] # helpers def _condition_factory(condition: ConditionType) -> Callable: """ Generates test functions for conditions provided as string or mapping. """ if isinstance(condition, str): if condition == "/": return _is_root_condition elif condition == "*": return _is_any_node_condition elif "://" in condition: # assumes URI dbg(f"Adding {condition} as namespace condition.") return HasNamespace(condition) elif condition.isalpha(): # assumes tag dbg(f"Adding {condition} as tag's local name condition.") return HasLocalname(condition) try: # it may be a css selctor _condition = _css_selector_translator(condition) except cssselect.SelectorError: pass else: dbg( f"Translated css selector {condition}` to XPath expression " f"{_condition}." ) condition = _condition # assumes XPath dbg(f"Adding {condition} as XPath condition.") return MatchesXPath(condition) elif isinstance(condition, Mapping): dbg(f"Adding {condition} as attribute condition.") return MatchesAttributes(condition) else: return condition class _CSSToXPathTranslator(cssselect.GenericTranslator): def selector_to_xpath(self, *args, **kwargs): result = super().selector_to_xpath(*args, **kwargs) if result.startswith("descendant-or-self::"): # though this should be equivalent, the abbreviated form proved # to work in cases where the full wouldn't result = result.replace("descendant-or-self::", "//", 1) return result _css_selector_translator = _CSSToXPathTranslator().css_to_xpath def dot_lookup(obj: AnyType, name: str): """ Looks up the attribute ``name`` from ``obj`` considering nested attributes that are separated by a ``.`` """ for _name in name.split("."): obj = getattr(obj, _name) return obj def _flatten_sequence(seq: Sequence): result: List = [] for item in seq: if isinstance(item, Sequence) and not isinstance(item, str): result.extend(_flatten_sequence(item)) else: result.append(item) return tuple(result) def _is_any_node_condition(_, __): return True def _is_flow_control(obj: AnyType) -> bool: try: return issubclass(obj, FlowControl) except TypeError: return False def _is_root_condition(node: TagNode, transformation: "Transformation"): return node.parent is None singleton_handler = lru_cache(HANDLER_CACHES_SIZE) # traverser def traverse_df_ltr_btt(root: TagNode) -> Iterator[TagNode]: def yield_children(node): for child in tuple(node.child_nodes(is_tag_node)): yield from yield_children(child) yield node yield from yield_children(root) def traverse_df_ltr_ttb(root: TagNode) -> Iterator[TagNode]: yield root yield from root.child_nodes(is_tag_node, recurse=True) def traverse_root(root: TagNode) -> Iterator[TagNode]: yield root # rules definition
[docs]def Any(*conditions: Sequence[ConditionType]) -> Callable: """ Returns a callable that evaluates the provided test functions and returns ``True`` if any of them returned that. """ conditions = tuple(_condition_factory(x) for x in _flatten_sequence(conditions)) def evaluator(node: TagNode, transformation: Transformation) -> bool: return any(x(node, transformation) for x in conditions) return evaluator
[docs]def OneOf(*conditions: Sequence[ConditionType]) -> Callable: """ Returns a callable that evaluates the provided test functions and returns ``True`` if exactly one of them returned that. """ conditions = tuple(_condition_factory(x) for x in _flatten_sequence(conditions)) def evaluator(node: TagNode, transformation: Transformation) -> bool: return [x(node, transformation) for x in conditions].count(True) == 1 return evaluator
[docs]def Not(*conditions: Sequence[ConditionType]) -> Callable: """ Returns a callable that evaluates the provided test functions and returns ``True`` if any of them returned ``False``. """ conditions = tuple(_condition_factory(x) for x in _flatten_sequence(conditions)) def evaluator(node: TagNode, transformation: Transformation) -> bool: return not any(x(node, transformation) for x in conditions) return evaluator
[docs]@singleton_handler def HasNamespace(namespace: AnyStr) -> Callable: """ Returns a callable that tests an node for the given tag namespace. """ def evaluator(node: TagNode, _) -> bool: return node.namespace == namespace return evaluator
[docs]@singleton_handler def HasLocalname(name: AnyStr) -> Callable: """ Returns a callable that tests an node for the given local tag name. """ def evaluator(node: TagNode, _) -> bool: return node.local_name == name return evaluator
[docs]@singleton_handler def MatchesXPath(xpath: Union[str, Callable]) -> Callable: """ Returns a callable that tests an node for the given XPath expression (whether the evaluation result on the :term:`transformation root` contains it). If the ``xpath`` argument is a callable, it will be called with the current transformation as argument to obtain the expression. """ def callable_evaluator(node: TagNode, transformation: Transformation) -> bool: _xpath = xpath(transformation) dbg(f"Resolved XPath from callable: '{_xpath}'") return node in transformation.root.xpath(_xpath) def string_evaluator(node: TagNode, transformation: Transformation) -> bool: return node in transformation.root.xpath(xpath) return callable_evaluator if callable(xpath) else string_evaluator
[docs]def MatchesAttributes(constraints: AttributesConditionType) -> Callable: """ Returns a callable that tests an node's attributes for constrains defined in a :term:`mapping`. All constraints must be matched to resolve as true. Expected keys and values can be provided as string or compiled regular expression object from the :mod:`re` module. A ``None`` as value constraint evaluates as true if the key is in the attributes regardless its value. It also implies that at least one attribute must match the key's constraint if this one is a regular expression object. Alternatively a callable can be passed that returns such mappings during the transformation. """ def callable_evaluator(node: TagNode, transformation: Transformation): _constraints = constraints(transformation) dbg(f"Resolved attributes' constraints from callable: '{_constraints}'") return MatchesAttributes(_constraints)(node, transformation) if callable(constraints): return callable_evaluator key_only_constraints = [k for k, v in constraints.items() if v is None] key_string_constraints = { k: v for k, v in constraints.items() if isinstance(k, str) and v is not None } key_re_constraints = { k: v for k, v in constraints.items() if isinstance(k, Pattern) and v is not None } def evaluator(node: TagNode, _) -> bool: attributes = node.attributes if constraints and not attributes: return False # check the presence of keys for key_constraint in key_only_constraints: if isinstance(key_constraint, str) and key_constraint not in attributes: return False elif isinstance(key_constraint, Pattern) and not any( key_constraint.match(key) for key in attributes.keys() ): return False value_string_constraints, value_re_constraints = {}, {} # check attributes' keys with string constraints for key_constraint, value_constraint in key_string_constraints.items(): if key_constraint not in attributes: return False if isinstance(value_constraint, str): value_string_constraints[key_constraint] = value_constraint elif isinstance(value_constraint, Pattern): value_re_constraints[key_constraint] = value_constraint # check attributes' keys with regular expression constraints for key_constraint, value_constraint in key_re_constraints.items(): for attribute in (x for x in attributes if key_constraint.match(x)): if isinstance(value_constraint, str): value_string_constraints[attribute] = value_constraint elif isinstance(value_constraint, Pattern): value_re_constraints[attribute] = value_constraint # check attributes' values for key, constraint in value_string_constraints.items(): if attributes[key] != constraint: return False for key, constraint in value_re_constraints.items(): if not constraint.match(attributes[key]): return False return True return evaluator
[docs]@singleton_handler def Ref(name: str) -> Callable: """ Returns a callable that can be used for value resolution in a condition test or :term:`handler function` that supports such. The value will be looked up during the processing of a transformation in :attr:`Transformation._available_symbols` by the given ``name``. This allows to reference dynamic values in :term:`transformation steps` and :class:`Rule` s. """ def simple_resolver(transformation: Transformation) -> AnyType: dbg("Resolving {name}.") return transformation._available_symbols[name] setattr(simple_resolver, REF_IDENTIFYING_ATTRIBUTE, None) def dot_resolver(transformation: Transformation) -> AnyType: dbg(f"Resolving {name}.") token = name.split(".") obj = transformation._available_symbols[token[0]] for _name in token[1:]: obj = getattr(obj, _name) return obj setattr(dot_resolver, REF_IDENTIFYING_ATTRIBUTE, None) return dot_resolver if "." in name else simple_resolver
[docs]def If(x: AnyType, operator: Callable, y: AnyType) -> Callable: """ Returns a callable that can be used as condition test in a :class:`Rule`. The arguments ``x`` and ``y`` can be given as callables that will be used to get the ``operator``'s input values during execution. Before you implement your own operators, mind that there are a lot available within Python's ``__builtins__`` and the standard library, in particular the :mod:`operator` module. Examples: >>> If(Ref('previous_result'), operator.is_not, None) # doctest: +SKIP """ # TODO allow single arguments # TODO? allow primitive expressions for stdlib.operator's members def evaluator(_, transformation: Transformation) -> AnyType: if callable(x): _x = x( **dependency_injection.resolve_dependencies( x, transformation._available_symbols ).as_kwargs ) dbg(f"x resolved to '{_x}'") else: _x = x if callable(y): _y = y( **dependency_injection.resolve_dependencies( y, transformation._available_symbols ).as_kwargs ) dbg(f"y resolved to '{_y}'") else: _y = y return operator(_x, _y) return evaluator
[docs]class Rule: """ Instances of this class can be used as conditional :term:`transformation steps` that are evaluated against all traversed nodes. :param conditions: All given conditions must evaluate as ``True`` in order for this rule to be applied. Strings and mappings can be provided as shortcuts, see :ref:`rule_condition_shortcuts` for details. The condition test functions are always called with the currently evaluated ``node`` and the :class:`Transformation` instance as arguments. There are helper functions for grouping conditions logically: :func:`Any`, :func:`Not` and :func:`OneOf`. :type conditions: A single callable, string or mapping, or a :term:`sequence` of such. :param handlers: These handlers will be called if the conditions matched. They can take any argument whose name is available in :attr:`Transformation._available_symbols`. :type handlers: A single callable or a :term:`sequence` of such. :param name: The optional rule's name. :type name: String. :param traversal_order: An optional traversal order that overrides the transformation's default :attr:`Transformation.config.traversal_order`, see :ref:`traversal_strategies` for details. :type traversal_order: Integer. """ __slots__ = ("name", "conditions", "handlers", "traversal_order") def __init__( self, conditions: Union[ConditionType, Sequence[ConditionType]], handlers: Union[Callable, Sequence[Callable]], name: str = None, traversal_order: int = None, ) -> None: self.name: str = name dbg(f"Initializing rule '{name}'.") if not isinstance(conditions, Sequence) or isinstance(conditions, str): conditions = (conditions,) conditions = _flatten_sequence(conditions) self.conditions = tuple(_condition_factory(x) for x in conditions) if _is_root_condition in self.conditions: traversal_order = TRAVERSE_ROOT_ONLY self.conditions = tuple( x for x in self.conditions if x is not _is_root_condition ) if not isinstance(handlers, Sequence): handlers = (handlers,) self.handlers = _flatten_sequence(handlers) self.traversal_order = traversal_order
[docs]class Once(Rule): """ This is a variant of :class:`Rule` that is only applied on the first match. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.handlers += (AbortRule,)
# transformation
[docs]class Transformation: """ A transformation instance is defined by its :term:`transformation steps` and :term:`configuration`. It is to be called with a :class:`delb.Document` or :class:`delb.TagNode` instance as :term:`transformation root`, only this node (or the root node of a ``Document``) and its children will be considered during traversal. :param steps: The designated transformation steps of the instance are given as a sequence of positional arguments. :param config: The configuration values for the instance are passed as keyword arguments. Beside the following keywords, it can be populated with any key-value-pairs that will be available in :attr:`inxs.Transformation._available_symbols` during a transformation. The defaults are defined in :attr:`~inxs.config_defaults`. - ``context`` can be provided as mapping with items that are added to the :term:`context` before a (sub-)document is processed. - ``common_rule_conditions`` can be used to define one or more conditions that must match in all rule evaluations. E.g. a transformation could be restricted to nodes with a certain namespace without redundantly defining that per rule. Can be given as a single object (e.g. a string) or as sequence. - ``copy`` is a boolean that defaults to ``True`` and indicates whether to process on a copy of the document's tree object. - ``name`` can be used to identify a transformation. - ``result_object`` sets the transformation's attribute that is returned as result. Dot-notation lookup (e.g. ``context.target``) is implemented. Per default the :term:`transformation root` is returned. - ``traversal_order`` sets the default traversal order for rule evaluations and itself defaults to depth first, left to right, to to bottom. See :ref:`traversal_strategies` for possible values. """ __slots__ = ("config", "steps", "states") config_defaults = { "common_rule_conditions": None, "context": {}, "copy": True, "name": None, "result_object": "root", "traversal_order": ( TRAVERSE_DEPTH_FIRST | TRAVERSE_LEFT_TO_RIGHT | TRAVERSE_TOP_TO_BOTTOM ), } """ The default :term:`configuration` values. Changing members on an instance actually affects the class unless a copy of this mapping as copied and bound as instance attribute. """ traversers = { TRAVERSE_DEPTH_FIRST | TRAVERSE_LEFT_TO_RIGHT | TRAVERSE_BOTTOM_TO_TOP: traverse_df_ltr_btt, TRAVERSE_DEPTH_FIRST | TRAVERSE_LEFT_TO_RIGHT | TRAVERSE_TOP_TO_BOTTOM: traverse_df_ltr_ttb, TRAVERSE_ROOT_ONLY: traverse_root, } def __init__(self, *steps: StepType, **config: AnyType) -> None: dbg(f"Initializing transformation instance named: '{config.get('name')}'.") self.steps = _flatten_sequence(steps) self.config = SimpleNamespace(**config) self._set_config_defaults() self._expand_rules_conditions() self._validate_steps() self.states = None @property def name(self): """ The ``name`` member of the transformation's :term:`configuration`. """ return getattr(self.config, "name", None) def _expand_rules_conditions(self): common_rule_conditions = self.config.common_rule_conditions if common_rule_conditions is None: return if not isinstance(common_rule_conditions, Sequence) or isinstance( common_rule_conditions, str ): common_rule_conditions = (common_rule_conditions,) expanded_steps = [] for step in self.steps: if isinstance(step, Rule): expanded_steps.append( Rule( common_rule_conditions + step.conditions, step.handlers, step.name, step.traversal_order, ) ) else: expanded_steps.append(step) self.steps = tuple(expanded_steps) def _set_config_defaults(self) -> None: for key, value in self.config_defaults.items(): if not hasattr(self.config, key): dbg(f"Using default value '{value}' for config key '{key}'.") setattr(self.config, key, value) def _validate_steps(self): assert all( isinstance(x, (Callable, Rule)) for x in self.steps ), "Transformation steps must be either a `Rule` instance or a callable." def __call__( self, input: Union[Document, TagNode], copy: bool = None, **context: AnyType ) -> AnyType: copy = self.config.copy if copy is None else copy self._init_transformation(input, copy, context) for step in self.steps: _step_name = step.name if hasattr(step, "name") else step.__name__ dbg(f"Processing rule '{_step_name}'.") self.states.current_step = step try: if isinstance(step, Rule): self._apply_rule(step) else: self._apply_handlers(step) except AbortTransformation: dbg("Aborting due to 'AbortTransformation'.") break if self.config.result_object: result = dot_lookup(self, self.config.result_object) if self.config.result_object == "root" and isinstance(input, Document): result = Document(result) else: result = None self._finalize_transformation() return result def _init_transformation( self, input: Union[Document, TagNode], copy: bool, context: Dict[AnyStr, AnyType], ) -> None: dbg("Initializing processing.") if not isinstance(input, (Document, TagNode)): raise TypeError( "A transformation must be called with a Document or TagNode instance, " f"got a {type(input)}." ) self.states = SimpleNamespace() self.states.current_node = None self.states.previous_result = None resolved_context = deepcopy(self.config.context) resolved_context.update(context) dbg(f"Initial context:\n{resolved_context}") self.states.context = SimpleNamespace(**resolved_context) if isinstance(input, Document): if copy: dbg("Cloning source.") input = input.clone() self.states.root = input.root else: if copy: dbg("Cloning source.") input = input.clone(deep=True) self.states.root = input static_symbols = { "config": self.config, "context": self.states.context, "nsmap": self.states.root.namespaces, "root": self.states.root, "transformation": self, } self.states.dynamic_symbols = {} self.states.symbols_chain = ChainMap( self.states.dynamic_symbols, static_symbols, self.states.context.__dict__, self.config.__dict__, ) def _apply_rule(self, rule: Rule) -> None: traverser = self._get_traverser(rule.traversal_order) dbg(f"Using traverser: {traverser}") for node in traverser(self.states.root): dbg(f"Evaluating {node}.") self.states.current_node = node try: if self._test_conditions(node, rule.conditions): self._apply_handlers(*rule.handlers) except AbortRule: dbg("Aborting rule.") break except SkipToNextNode: dbg("Skipping to next node.") continue self.states.current_node = None @lru_cache(8) def _get_traverser(self, traversal_order: Union[int, None]) -> Callable: if traversal_order is None: traversal_order = self.config.traversal_order traverser = self.traversers.get(traversal_order) if traverser is None: raise NotImplementedError return traverser def _test_conditions(self, node: TagNode, conditions: Sequence[Callable]) -> bool: # there's no dependency injection here because its overhead # shall be avoided during testing conditions for condition in conditions: dbg(f"Testing condition '{condition}'.") if not condition(node, self): dbg("The condition did not apply.") return False dbg("The condition applied.") return True def _apply_handlers(self, *handlers: Union[Callable, Exception]) -> None: dbg("Applying handlers.") for handler in handlers: if _is_flow_control(handler): raise handler kwargs = dependency_injection.resolve_dependencies( handler, self._available_symbols ).as_kwargs if isinstance(handler, Transformation): kwargs["input"] = self.states.current_node or self.states.root kwargs["copy"] = False dbg(f"Applying handler {handler}.") self.states.previous_result = handler(**kwargs) def _finalize_transformation(self) -> None: dbg("Finalizing processing.") self.states = None @property def _available_symbols(self) -> Mapping: """ This mapping contains items that are used for the dependency injection of handler functions. These names are included: - All attributes of the transformation's :term:`configuration`, overridden by the following. - All attributes of the transformation's :term:`context`, overridden by the following. - ``config`` - The :term:`configuration` namespace object. - ``context`` - The :term:`context` namespace object. - ``node`` - The node that matched a :class:`Rule`'s conditions or ``None`` in case of simple :term:`transformation steps`. - ``previous_result`` - The result that was returned by the previously evaluated handler function. - ``root`` - The root node of the processed (sub-)document a.k.a. :term:`transformation root`. - ``transformation`` - The calling :class:`Transformation` instance. """ self.states.dynamic_symbols.update( { "node": self.states.current_node, "previous_result": self.states.previous_result, } ) return self.states.symbols_chain # aliases that are supposed to be broken when the transformation isn't processing @property def context(self): """ This property can be used to access the :term:`context` while the transformation is processing. """ return self.states.context @property def root(self): """ This property can be used to access the root node of the currently processed (sub-)document. """ return self.states.root
__all__ = [ "__version__", "logger", "TRAVERSE_BOTTOM_TO_TOP", "TRAVERSE_DEPTH_FIRST", "TRAVERSE_LEFT_TO_RIGHT", "TRAVERSE_RIGHT_TO_LEFT", "TRAVERSE_ROOT_ONLY", "TRAVERSE_TOP_TO_BOTTOM", "TRAVERSE_WIDTH_FIRST", AbortRule.__name__, AbortTransformation.__name__, SkipToNextNode.__name__, InxsException.__name__, "Any", "Not", "OneOf", "HasNamespace", "HasLocalname", "MatchesAttributes", "MatchesXPath", "If", "Ref", Rule.__name__, Once.__name__, Transformation.__name__, ]