Source code for sheraf.queryset

import itertools
import operator
from collections import OrderedDict
from collections.abc import Iterable
from collections.abc import Iterator

import sheraf.constants
from sheraf.exceptions import InvalidFilterException
from sheraf.exceptions import InvalidOrderException
from sheraf.tools.more_itertools import unique_everseen


[docs]class QuerySet: """ A :class:`~sheraf.queryset.QuerySet` is a collection containing :class:`~sheraf.models.Model` instances. Like in a regular :class:`set`, objects are unique, but the main difference is that :class:`~sheraf.queryset.QuerySet` keeps the insertion order. :param iterable: A collection of models. If `iterable` is None, then `model` must be set. :type iterable: Iterable :param model_class: A model class to iterate over. If `model` is None, then `iterable` must be set. If both are set, `model_class` is ignored. :type reversed_iterable: A Model class :param predicate: a callable takes an instance as parameter and return a Boolean (if True, the instance will be returned in the iteration). If None, everything is returned. :type predicate: Predicate :param kwargs: A dictionnary containing the values expected from the model parameters. If `kwargs` is `{"foo": "bar"}` then the queryset will only contains models which attribute `foo` is `"bar"`. For the following examples, let us work with a simple **Cowboy** model. For the sake of simplicity we use a :class:`~sheraf.models.IntOrderedNamedAttributesModel` so the first instance created will have id 0, the second will have id 1 and so on... >>> class Cowboy(sheraf.IntOrderedNamedAttributesModel): ... table = "queryset_people" ... name = sheraf.SimpleAttribute() ... age = sheraf.SimpleAttribute() ... >>> with sheraf.connection(commit=True): ... peter = Cowboy.create(name="Peter", age=30) ... steven = Cowboy.create(name="Steven", age=30) ... george = Cowboy.create(name="George Abitbol", age=50) :class:`~sheraf.queryset.QuerySet` are mostly created by doing requests on a Model with :func:`~sheraf.models.indexation.IndexedModel.all`, :func:`~sheraf.models.indexation.IndexedModel.filter` or :func:`~sheraf.models.indexation.IndexedModel.order`, but can also be initialized with custom data. >>> with sheraf.connection(): ... a = Cowboy.all() # returns a QuerySet with all Cowboy ... b = QuerySet([peter, steven, george]) # This is an equivalent custom QuerySet ... assert list(a) == list(b) :class:`~sheraf.queryset.QuerySet` behave like iterators, and can only be consumed once. >>> with sheraf.connection(): ... everybody = Cowboy.all() ... assert [peter, steven, george] == list(everybody) ... assert [] == everybody .. note :: :class:`~sheraf.queryset.QuerySet` can be compared against anything iterable, but the comparison will consume the :class:`~sheraf.queryset.QuerySet`. :class:`~sheraf.queryset.QuerySet` keeps the order of insertion. >>> assert QuerySet([peter, steven]) != QuerySet([steven, peter]) :class:`~sheraf.queryset.QuerySet` supports slicing. Slices returns another :class:`~sheraf.queryset.QuerySet`. >>> with sheraf.connection(): ... assert peter == Cowboy.all()[0] ... assert QuerySet([peter, steven]) == Cowboy.all()[0:2] ... assert QuerySet([peter, steven, george]) == Cowboy.all()[0:] """ def __init__( self, iterable=None, model_class=None, predicate=None, primary_key=None, **kwargs, ): self.filters = OrderedDict(kwargs) self._iterable = iterable self._iterator = None self._predicate = predicate self._start = None self._stop = None self._step = None self.model = model_class if primary_key: self.primary_key = primary_key elif model_class: self.primary_key = model_class.primary_key() self.orders = OrderedDict() if iterable is None and model_class is None: self._iterable = [] self.__iter__() def __iter__(self): return self def __repr__(self): result = "<QuerySet" if self.model: result = f"{result} model={self.model.__name__}" if self._iterable: result = f"{result} iterable={self._iterable}" if self._predicate: result = f"{result} predicate={self._predicate}" if self.filters: filters = ", ".join(f"{k}={v[1]}" for k, v in self.filters.items()) result = f"{result} filters=({filters})" result = f"{result}>" return result def __next__(self): if not self._iterator: self._init_iterator() return next(self._iterator) def __eq__(self, other): if isinstance(other, Iterable): return all( a == b for a, b in itertools.zip_longest( iter(self), iter(other), fillvalue=object() ) ) return super().__eq__(other) def __and__(self, other): return QuerySet( set(self) & set(other), self.model if self.model == other.model else None, ) def __or__(self, other): return QuerySet( set(self) | set(other), self.model if self.model == other.model else None, ) def __xor__(self, other): return QuerySet( set(self) ^ set(other), self.model if self.model == other.model else None, ) def __add__(self, other): return QuerySet( unique_everseen(itertools.chain(self, other)), self.model if self.model == other.model else None, ) def __bool__(self): try: next(self.copy()) self._iterator = None return True except StopIteration: return False def __getitem__(self, item): qs = self.copy() if isinstance(item, slice): qs._start, qs._stop, qs._step = item.start, item.stop, item.step return qs else: qs._start, qs._stop, qs._step = item, item + 1, 1 return next(qs) def __len__(self): """ :return: The number of objects in the :class:`~sheraf.queryset.QuerySet`. >>> with sheraf.connection(): ... assert Cowboy.create() ... qs = Cowboy.all() ... assert len(qs) == 1 """ # No shortcut possible when there is a predicate, # or when there are several filters if self._predicate or not self.model or len(self.filters) > 1: return sum(1 for _ in self.copy()) # We basically want to search all the models if not self.filters: return self.model.count() # Take the unique filter, and hope it is indexed index_name = list(self.filters.keys())[0] _, filter_value, filter_search_func = list(self.filters.values())[0] if index_name not in self.model.indexes: return sum(1 for _ in self.copy()) index = self.model.indexes[index_name] if filter_search_func: index_values = index.details.call_search_func(self.model, filter_value) else: index_values = filter_value if index.details.unique: return sum(int(index.has_item(v)) for v in index_values) else: return sum( len(index.get_item(v)) for v in index_values if index.has_item(v) ) @property def indexed_filters(self): return [ ( name, value, search_func, self.orders.get(name) == sheraf.constants.DESC, ) for (name, value, search_func) in self.filters.values() if name in self.model.indexes and self.model.indexes[name].details.auto ] @property def non_indexed_filters(self): return [ (name, value) for (name, value, _) in self.filters.values() if (name not in self.model.indexes and name in self.model.attributes) or ( name in self.model.indexes and not self.model.indexes[name].details.auto ) ] @property def first_indexed_order(self): for name, value in self.orders.items(): if ( self.model and name in self.model.indexes and self.model.indexes[name].details.auto ): return name, value def get_index_keys(self, index, filter_value, search_func, reverse): if not filter_value: return index.iterkeys(reverse=reverse) if search_func: return index.details.call_search_func(self.model, filter_value) return [filter_value] def _objects_ids(self, index_name, filter_value, search_func, reverse): index = self.model.indexes[index_name] keys = self.get_index_keys(index, filter_value, search_func, reverse) if index.details.unique: mappings = (index.get_item(key, True) for key in keys) else: mappings_lists = (index.get_item(key, True) for key in keys) mappings = (m for l in mappings_lists if l for m in l) return (m[self.model.primary_key()] for m in mappings if m) def _multiple_indexes_iterator(self): ids_sets = (set(self._objects_ids(*index)) for index in self.indexed_filters) raw_ids = set.intersection(*ids_sets) pk_attribute = self.model.attributes[self.model.primary_key()] ids = (pk_attribute.deserialize(id_) for id_ in raw_ids) return ids def _single_index_iterator( self, index_name, filter_value=None, search_func=None, reverse=False ): """ Returns an iterator over one indexed attribute. """ pk_attribute = self.model.attributes[self.model.primary_key()] raw_ids = self._objects_ids(index_name, filter_value, search_func, reverse) unique_raw_ids = unique_everseen(raw_ids) ids = (pk_attribute.deserialize(id_) for id_ in unique_raw_ids) return ids def _primary_index_iterator(self): pk_index = self.model.indexes[self.primary_key] pk_attribute = self.model.attributes[self.model.primary_key()] reverse = self.orders.get(self.primary_key) == sheraf.constants.DESC if self.primary_key == self.model.primary_key(): ids = pk_index.iterkeys(reverse) elif self.model.indexes[self.primary_key].details.unique: ids = ( pk_attribute.deserialize(mapping[self.model.primary_key()]) for mapping in pk_index.itervalues(reverse) ) else: ids = ( pk_attribute.deserialize(id) for mappings in pk_index.itervalues(reverse) for id in mappings.keys() ) return ids def _init_iterator(self): if self._iterable: iterator = iter(self._iterable) elif not self.model: iterator = iter([]) # iterator on the first indexed filtered attribute elif self.indexed_filters: iterator = self._single_index_iterator(*self.indexed_filters[0]) # iterator on the first indexed orderde attribute elif self.first_indexed_order: iterator = self._single_index_iterator( index_name=self.first_indexed_order[0], reverse=self.first_indexed_order[1], ) # iterate all items on the primary key else: iterator = self._primary_index_iterator() # instanciate model objects if self.model: iterator = ( self.model.read(key) if not isinstance(key, self.model) else key for key in iterator ) # Checks the models fits all the filters iterator = ( model for model in iterator if self._model_has_expected_values(model) ) # Successively sorts the list from the less important # order to the most important order. already_ordered = ( not self.indexed_filters and self.first_indexed_order and len(self.orders) == 1 ) if self.orders and not already_ordered: iterable = iterator for attribute, order in reversed(self.orders.items()): # those calls to 'sorted' have a HUGE impact on read perfs iterable = sorted( iterable, key=operator.attrgetter(attribute), reverse=(order == sheraf.constants.DESC), ) iterator = iter(iterable) # Only select a slice of the wanted models if self._start is not None or self._stop is not None or self._step is not None: iterator = itertools.islice(iterator, self._start, self._stop, self._step) self._iterator = iterator def _model_has_expected_values(self, model): if not all( getattr(model, filter_name) == expected_value for filter_name, expected_value in self.non_indexed_filters ): return False if not all( ( set(model.indexes[name].details.call_search_func(model, value)) & set(model.indexes[name].details.get_model_index_keys(model)) ) if search_func else (value in model.indexes[name].details.get_model_index_keys(model)) for name, value, search_func, _ in self.indexed_filters ): return False return not self._predicate or self._predicate(model) def count(self): return len(self)
[docs] def copy(self): """Copies the :class:`~sheraf.queryset.QuerySet` without consuming it. >>> with sheraf.connection(): ... peter = Cowboy.create(name="Peter") ... steven = Cowboy.create(name="Steven") ... george = Cowboy.create(name="George") ... qall = Cowboy.all() ... qcopy = qall.copy() ... ... assert [peter, steven, george] == qall ... # now qall is consumed ... ... assert [peter, steven, george] == qcopy ... # now qcopy is consumed """ if isinstance(self._iterable, Iterator): self._iterable, iterable = itertools.tee(self._iterable) else: iterable = self._iterable qs = QuerySet(iterable, self.model) qs.filters = self.filters.copy() qs.orders = self.orders.copy() qs._predicate = self._predicate return qs
[docs] def delete(self): """Delete the objects contained in the queryset. Avoids problems when itering on deleted objects. """ identifiers = [(m.__class__, m.identifier) for m in self] for klass, identifier in identifiers: klass.read(identifier).delete()
[docs] def filter(self, predicate=None, **kwargs): """Refine a copy of the current :class:`~sheraf.queryset.QuerySet` with further tests. :param predicate: filter instance by returning a truthy value. If `None` everything is selected. :type predicate: callable object :param kwargs: A dictionnary containing the values expected from the model parameters. If ``kwargs`` is ``{"foo": "bar"}`` then the queryset will only contains models which attribute ``foo`` is ``"bar"``. :type kwargs: A dictionary which keys must be valid attributes of the model iterated. :return: A copy of the current :class:`~sheraf.queryset.QuerySet` refined with further tests. :return type: :class:`~sheraf.queryset.QuerySet` It is possible to chain :func:`~sheraf.queryset.QuerySet.filter` calls: >>> with sheraf.connection(): ... assert Cowboy.filter(name="George Abitbol", age=50) == \\ ... Cowboy.filter(name="George Abitbol").filter(age=50) ... >>> with sheraf.connection(): ... assert Cowboy.filter(lambda person: "Abitbol" in person.name, age=50) == \\ ... Cowboy.filter(lambda person: "Abitbol" in person.name).filter(age=50) An attribute cannot be filtered twice: >>> with sheraf.connection(): ... Cowboy.filter(age=30).filter(age=40) Traceback (most recent call last): ... sheraf.exceptions.InvalidFilterException: Some filter parameters appeared twice .. note:: Filtering on indexed attributes is more performant than filtering on non-indexed attributes. See :func:`~sheraf.attributes.Attribute.index`. """ return self._filter(False, predicate=predicate, **kwargs)
[docs] def search(self, **kwargs): """ Refine a copy of the current :class:`~sheraf.queryset.QuerySet` with further tests. This method is very similar to :func:`~sheraf.queryset.QuerySet.filter` except the values it takes are transformed with the same way values are transformed at indexation. TODO: pas très clair For instance, if an attribute indexes its values with a lowercase search_func, the :func:`~sheraf.queryset.QuerySet.search` attributes will go through the same search_func. Hence it allows to pass uppercase filter values, while :func:`~sheraf.queryset.QuerySet.filter` does not allow this. >>> class MyCustomModel(sheraf.Model): ... table = "my_custom_model" ... my_attribute = sheraf.SimpleAttribute().index( ... index_keys_func=lambda string: {string.lower()} ... ) ... >>> with sheraf.connection(commit=True): ... m = MyCustomModel.create(my_attribute="FOO") ... >>> with sheraf.connection(): ... assert [m] == MyCustomModel.search(my_attribute="foo") ... assert [m] == MyCustomModel.filter(my_attribute="foo") ... ... assert [m] == MyCustomModel.search(my_attribute="FOO") ... assert [] == MyCustomModel.filter(my_attribute="FOO") """ return self._filter(True, **kwargs)
def _filter(self, search_func, predicate=None, **kwargs): qs = self.copy() if self.model: for filter_name in kwargs.keys(): if ( filter_name not in self.model.attributes and filter_name not in self.model.indexes ): raise sheraf.exceptions.InvalidFilterException( "{} has no attribute {}".format( self.model.__name__, filter_name ) ) kwargs_values = OrderedDict( { filter_name: (filter_name, filter_value, search_func) for filter_name, filter_value in kwargs.items() } ) common_attributes = set(qs.filters) & set(kwargs_values) invalid_common_attributes = any( key for key in common_attributes if qs.filters[key] != kwargs_values[key] ) if invalid_common_attributes: raise InvalidFilterException("Some filter parameters appeared twice") qs.filters.update(kwargs_values) if not qs._predicate: qs._predicate = predicate elif predicate: old_predicate = qs._predicate qs._predicate = lambda m: old_predicate(m) and predicate(m) return qs
[docs] def order(self, *args, **kwargs): """Copies the current :class:`~sheraf.queryset.QuerySet` and adds more order to it. :param args: There can be only one positionnal argument. Choose to iterate over ids in an ascending or a descending way. :type args: ``sheraf.ASC`` or ``sheraf.DESC`` :param kwargs: Further parameters will set an order on the matching model attributes. :type kwargs: A dictionary which keys must be valid attributes of the model iterated, and the values must be ``sheraf.ASC`` or ``sheraf.DESC`` :return: A copy of the current :class:`~sheraf.queryset.QuerySet` with refined order. :return type: :class:`~sheraf.queryset.QuerySet` The default order is the ascending model ids. >>> with sheraf.connection(commit=True): ... peter = Cowboy.create(name="Peter", age=35) ... steven = Cowboy.create(name="Steven", age=35) ... george = Cowboy.create(name="George", age=50) ... >>> with sheraf.connection(): ... assert [peter, steven, george] == Cowboy.all() ... assert [peter, steven, george] == Cowboy.all().order(sheraf.ASC) ... assert [george, steven, peter] == Cowboy.all().order(sheraf.DESC) ... ... assert [george, peter, steven] == Cowboy.all().order(name=sheraf.ASC) ... assert [steven, peter, george] == Cowboy.all().order(name=sheraf.DESC) Several order parameters can be passed, either as arguments of the function, or by calling :func:`~sheraf.queryset.QuerySet.order` calls. >>> with sheraf.connection(): ... assert [george, peter, steven] == Cowboy.all().order(age=sheraf.DESC, name=sheraf.ASC) .. note:: Sorting on indexed attributes is more performant than sorting on other attributes. See :func:`~sheraf.attributes.Attribute.index`. The less :func:`~sheraf.queryset.QuerySet.order` parameters are passed, the better performances will be. """ if not self.model and (args or not kwargs): raise InvalidOrderException( "QuerySets without models should have explict order arguments" ) if len(args) > 1: raise InvalidOrderException( "Only one 'order' positionnal parameter is allowed." ) qs = self.copy() if self.model: for attribute, value in kwargs.items(): if attribute not in self.model.attributes: raise sheraf.exceptions.InvalidOrderException( f"{self.model.__name__} has no attribute {attribute}" ) if value not in (sheraf.constants.ASC, sheraf.constants.DESC): raise sheraf.exceptions.InvalidOrderException( "Parameter {} has an invalid order value {}".format( attribute, value ) ) if len(args) > 0: identifier = args[0] if identifier not in (sheraf.constants.ASC, sheraf.constants.DESC): raise InvalidOrderException( f"Parameter id has an invalid order value {identifier}" ) if self.primary_key in qs.orders: raise InvalidOrderException("Id order has been set twice") qs.orders[self.primary_key] = identifier common_attributes = set(qs.orders) & set(kwargs) if common_attributes: raise InvalidOrderException("Some order parameters appeared twice") qs.orders.update(kwargs) return qs
[docs] def get(self): """If the :class:`~sheraf.queryset.QuerySet` contains one, and only one item, this method returns the item. If the :class:`~sheraf.queryset.QuerySet` contains several objects, it raises a :class:`~sheraf.exceptions.QuerySetUnpackException`. If the :class:`~sheraf.queryset.QuerySet` is empty, it raises a :class:`~sheraf.exceptions.EmptyQuerySetUnpackException`. >>> with sheraf.connection(): ... peter = Cowboy.create(name="Peter") ... steven = Cowboy.create(name="Steven") ... assert peter == Cowboy.filter(name="Peter").get() ... Cowboy.all().get() Traceback (most recent call last): ... sheraf.exceptions.TooManyValuesSetUnpackException: Trying to unpack a QuerySet with multiple elements <QuerySet model=Cowboy> >>> with sheraf.connection(): ... Cowboy.filter(age=30).get() Traceback (most recent call last): ... sheraf.exceptions.EmptyQuerySetUnpackException: Trying to unpack an empty QuerySet >>> with sheraf.connection(): ... Cowboy.filter(name="Unknown cowboy").get() Traceback (most recent call last): ... sheraf.exceptions.EmptyQuerySetUnpackException: Trying to unpack an empty QuerySet """ try: element = next(self) except StopIteration: raise sheraf.exceptions.EmptyQuerySetUnpackException(queryset=self) try: next(self) except StopIteration: return element else: raise sheraf.exceptions.TooManyValuesSetUnpackException(queryset=self)