import itertools
import operator
from collections import OrderedDict
from import Iterable
from import Iterator

import sheraf.constants
from sheraf.exceptions import InvalidFilterException
from sheraf.exceptions import InvalidOrderException
from import unique_everseen

[docs]class QuerySet: """ A :class:`~sheraf.queryset.QuerySet` is a collection containing :class:`~sheraf.models.Model` instances. Like in a regular :class:`set`, objects are unique, but the main difference is that :class:`~sheraf.queryset.QuerySet` keeps the insertion order. :param iterable: A collection of models. If `iterable` is None, then `model` must be set. :type iterable: Iterable :param model_class: A model class to iterate over. If `model` is None, then `iterable` must be set. If both are set, `model_class` is ignored. :type reversed_iterable: A Model class :param predicate: a callable takes an instance as parameter and return a Boolean (if True, the instance will be returned in the iteration). If None, everything is returned. :type predicate: Predicate :param kwargs: A dictionnary containing the values expected from the model parameters. If `kwargs` is `{"foo": "bar"}` then the queryset will only contains models which attribute `foo` is `"bar"`. For the following examples, let us work with a simple **Cowboy** model. For the sake of simplicity we use a :class:`~sheraf.models.IntOrderedNamedAttributesModel` so the first instance created will have id 0, the second will have id 1 and so on... >>> class Cowboy(sheraf.IntOrderedNamedAttributesModel): ... table = "queryset_people" ... name = sheraf.SimpleAttribute() ... age = sheraf.SimpleAttribute() ... >>> with sheraf.connection(commit=True): ... peter = Cowboy.create(name="Peter", age=30) ... steven = Cowboy.create(name="Steven", age=30) ... george = Cowboy.create(name="George Abitbol", age=50) :class:`~sheraf.queryset.QuerySet` are mostly created by doing requests on a Model with :func:`~sheraf.models.indexation.IndexedModel.all`, :func:`~sheraf.models.indexation.IndexedModel.filter` or :func:`~sheraf.models.indexation.IndexedModel.order`, but can also be initialized with custom data. >>> with sheraf.connection(): ... a = Cowboy.all() # returns a QuerySet with all Cowboy ... b = QuerySet([peter, steven, george]) # This is an equivalent custom QuerySet ... assert list(a) == list(b) :class:`~sheraf.queryset.QuerySet` behave like iterators, and can only be consumed once. >>> with sheraf.connection(): ... everybody = Cowboy.all() ... assert [peter, steven, george] == list(everybody) ... assert [] == everybody .. note :: :class:`~sheraf.queryset.QuerySet` can be compared against anything iterable, but the comparison will consume the :class:`~sheraf.queryset.QuerySet`. :class:`~sheraf.queryset.QuerySet` keeps the order of insertion. >>> assert QuerySet([peter, steven]) != QuerySet([steven, peter]) :class:`~sheraf.queryset.QuerySet` supports slicing. Slices returns another :class:`~sheraf.queryset.QuerySet`. >>> with sheraf.connection(): ... assert peter == Cowboy.all()[0] ... assert QuerySet([peter, steven]) == Cowboy.all()[0:2] ... assert QuerySet([peter, steven, george]) == Cowboy.all()[0:] """ def __init__( self, iterable=None, model_class=None, predicate=None, primary_key=None, **kwargs, ): self.filters = OrderedDict(kwargs) self._iterable = iterable self._iterator = None self._predicate = predicate self._start = None self._stop = None self._step = None self.model = model_class if primary_key: self.primary_key = primary_key elif model_class: self.primary_key = model_class.primary_key() self.orders = OrderedDict() if iterable is None and model_class is None: self._iterable = [] self.__iter__() def __iter__(self): return self def __repr__(self): result = "<QuerySet" if self.model: result = f"{result} model={self.model.__name__}" if self._iterable: result = f"{result} iterable={self._iterable}" if self._predicate: result = f"{result} predicate={self._predicate}" if self.filters: filters = ", ".join(f"{k}={v[1]}" for k, v in self.filters.items()) result = f"{result} filters=({filters})" result = f"{result}>" return result def __next__(self): if not self._iterator: self._init_iterator() return next(self._iterator) def __eq__(self, other): if isinstance(other, Iterable): return all( a == b for a, b in itertools.zip_longest( iter(self), iter(other), fillvalue=object() ) ) return super().__eq__(other) def __and__(self, other): return QuerySet(set(self) & set(other)) def __or__(self, other): return QuerySet(set(self) | set(other)) def __xor__(self, other): return QuerySet(set(self) ^ set(other)) def __add__(self, other): return QuerySet(unique_everseen(itertools.chain(self, other))) def __bool__(self): try: next(self.copy()) self._iterator = None return True except StopIteration: return False def __getitem__(self, item): qs = self.copy() if isinstance(item, slice): qs._start, qs._stop, qs._step = item.start, item.stop, item.step return qs else: qs._start, qs._stop, qs._step = item, item + 1, 1 return next(qs) def __len__(self): """ :return: The number of objects in the :class:`~sheraf.queryset.QuerySet`. >>> with sheraf.connection(): ... assert Cowboy.create() ... qs = Cowboy.all() ... assert len(qs) == 1 """ # No shortcut possible when there is a predicate, # or when there are several filters if self._predicate or not self.model or len(self.filters) > 1: return sum(1 for _ in self.copy()) # We basically want to search all the models if not self.filters: return self.model.count() # Take the unique filter, and hope it is indexed index_name = list(self.filters.keys())[0] _, filter_value, filter_search_func = list(self.filters.values())[0] if index_name not in self.model.indexes: return sum(1 for _ in self.copy()) index = self.model.indexes[index_name] if filter_search_func: index_values = index.details.call_search_func(self.model, filter_value) else: index_values = filter_value if index.details.unique: return sum(int(index.has_item(v)) for v in index_values) else: return sum( len(index.get_item(v)) for v in index_values if index.has_item(v) ) @property def indexed_filters(self): return [ ( name, value, search_func, self.orders.get(name) == sheraf.constants.DESC, ) for (name, value, search_func) in self.filters.values() if name in self.model.indexes and self.model.indexes[name] ] @property def non_indexed_filters(self): return [ (name, value) for (name, value, _) in self.filters.values() if (name not in self.model.indexes and name in self.model.attributes) or ( name in self.model.indexes and not self.model.indexes[name] ) ] def _index_keys(self, index, value, search_func): return ( index.details.call_search_func(self.model, value) if search_func else [value] ) def _objects_ids(self, name, value, search_func, reverse): index = self.model.indexes[name] keys = self._index_keys(index, value, search_func) if index.details.unique: mappings = (index.get_item(key, True) for key in keys) else: mappings_lists = (index.get_item(key, True) for key in keys) mappings = (m for l in mappings_lists if l for m in l) return (m[self.model.primary_key()] for m in mappings if m) def _multi_indexes_iterator(self): ids_sets = (set(self._objects_ids(*index)) for index in self.indexed_filters) raw_ids = set.intersection(*ids_sets) pk_attribute = self.model.attributes[self.model.primary_key()] ids = (pk_attribute.deserialize(id_) for id_ in raw_ids) return ids def _mono_indexes_iterator(self): pk_attribute = self.model.attributes[self.model.primary_key()] raw_ids = self._objects_ids(*self.indexed_filters[0]) unique_raw_ids = unique_everseen(raw_ids) ids = (pk_attribute.deserialize(id_) for id_ in unique_raw_ids) return ids def _primary_index_iterator(self): identifier_index = self.model.indexes[self.primary_key] pk_attribute = self.model.attributes[self.model.primary_key()] reverse = self.orders.get(self.primary_key) == sheraf.constants.DESC if self.primary_key == self.model.primary_key(): ids = identifier_index.iterkeys(reverse) elif self.model.indexes[self.primary_key].details.unique: ids = ( pk_attribute.deserialize(mapping[self.model.primary_key()]) for mapping in identifier_index.itervalues(reverse) ) else: ids = ( pk_attribute.deserialize(id) for mappings in identifier_index.itervalues(reverse) for id in mappings.keys() ) return ids def _init_iterator(self): if self._iterable: iterator = iter(self._iterable) elif not self.model: iterator = iter([]) # iterate all items on the primary key elif not self.indexed_filters: iterator = self._primary_index_iterator() # iterator on several indexed filters elif not self.orders: iterator = self._mono_indexes_iterator() if self.model: iterator = self.model.read_these(iterator) # Checks the models fits all the filters iterator = ( model for model in iterator if self._model_has_expected_values(model) ) # Successively sorts the list from the less important # order to the most important order. if self.orders: iterable = iterator for attribute, order in reversed(self.orders.items()): iterable = sorted( iterable, key=operator.attrgetter(attribute), reverse=(order == sheraf.constants.DESC), ) iterator = iter(iterable) # Only select a slice of the wanted models if self._start is not None or self._stop is not None or self._step is not None: iterator = itertools.islice(iterator, self._start, self._stop, self._step) self._iterator = iterator def _model_has_expected_values(self, model): if not all( getattr(model, filter_name) == expected_value for filter_name, expected_value in self.non_indexed_filters ): return False if not all( ( set(model.indexes[name].details.call_search_func(model, value)) & set(model.indexes[name].details.get_model_index_keys(model)) ) if search_func else (value in model.indexes[name].details.get_model_index_keys(model)) for name, value, search_func, _ in self.indexed_filters ): return False return not self._predicate or self._predicate(model) def count(self): return len(self)
[docs] def copy(self): """Copies the :class:`~sheraf.queryset.QuerySet` without consuming it. >>> with sheraf.connection(): ... peter = Cowboy.create(name="Peter") ... steven = Cowboy.create(name="Steven") ... george = Cowboy.create(name="George") ... qall = Cowboy.all() ... qcopy = qall.copy() ... ... assert [peter, steven, george] == qall ... # now qall is consumed ... ... assert [peter, steven, george] == qcopy ... # now qcopy is consumed """ if isinstance(self._iterable, Iterator): self._iterable, iterable = itertools.tee(self._iterable) else: iterable = self._iterable qs = QuerySet(iterable, self.model) qs.filters = self.filters.copy() qs.orders = self.orders.copy() qs._predicate = self._predicate return qs
[docs] def delete(self): """Delete the objects contained in the queryset. Avoids problems when itering on deleted objects. """ identifiers = [(m.__class__, m.identifier) for m in self] for klass, identifier in identifiers:
[docs] def filter(self, predicate=None, **kwargs): """Refine a copy of the current :class:`~sheraf.queryset.QuerySet` with further tests. :param predicate: filter instance by returning a truthy value. If `None` everything is selected. :type predicate: callable object :param kwargs: A dictionnary containing the values expected from the model parameters. If ``kwargs`` is ``{"foo": "bar"}`` then the queryset will only contains models which attribute ``foo`` is ``"bar"``. :type kwargs: A dictionary which keys must be valid attributes of the model iterated. :return: A copy of the current :class:`~sheraf.queryset.QuerySet` refined with further tests. :return type: :class:`~sheraf.queryset.QuerySet` It is possible to chain :func:`~sheraf.queryset.QuerySet.filter` calls: >>> with sheraf.connection(): ... assert Cowboy.filter(name="George Abitbol", age=50) == \\ ... Cowboy.filter(name="George Abitbol").filter(age=50) ... >>> with sheraf.connection(): ... assert Cowboy.filter(lambda person: "Abitbol" in, age=50) == \\ ... Cowboy.filter(lambda person: "Abitbol" in An attribute cannot be filtered twice: >>> with sheraf.connection(): ... Cowboy.filter(age=30).filter(age=40) Traceback (most recent call last): ... sheraf.exceptions.InvalidFilterException: Some filter parameters appeared twice .. note:: Filtering on indexed attributes is more performant than filtering on non-indexed attributes. See :func:`~sheraf.attributes.Attribute.index`. """ return self._filter(False, predicate=predicate, **kwargs)
[docs] def search(self, **kwargs): """ Refine a copy of the current :class:`~sheraf.queryset.QuerySet` with further tests. This method is very similar to :func:`~sheraf.queryset.QuerySet.filter` except the values it takes are transformed with the same way values are transformed at indexation. TODO: pas très clair For instance, if an attribute indexes its values with a lowercase search_func, the :func:`` attributes will go through the same search_func. Hence it allows to pass uppercase filter values, while :func:`~sheraf.queryset.QuerySet.filter` does not allow this. >>> class MyCustomModel(sheraf.Model): ... table = "my_custom_model" ... my_attribute = sheraf.SimpleAttribute().index( ... index_keys_func=lambda string: {string.lower()} ... ) ... >>> with sheraf.connection(commit=True): ... m = MyCustomModel.create(my_attribute="FOO") ... >>> with sheraf.connection(): ... assert [m] =="foo") ... assert [m] == MyCustomModel.filter(my_attribute="foo") ... ... assert [m] =="FOO") ... assert [] == MyCustomModel.filter(my_attribute="FOO") """ return self._filter(True, **kwargs)
def _filter(self, search_func, predicate=None, **kwargs): qs = self.copy() if self.model: for filter_name in kwargs.keys(): if ( filter_name not in self.model.attributes and filter_name not in self.model.indexes ): raise sheraf.exceptions.InvalidFilterException( "{} has no attribute {}".format( self.model.__name__, filter_name ) ) kwargs_values = OrderedDict( { filter_name: (filter_name, filter_value, search_func) for filter_name, filter_value in kwargs.items() } ) common_attributes = set(qs.filters) & set(kwargs_values) invalid_common_attributes = any( key for key in common_attributes if qs.filters[key] != kwargs_values[key] ) if invalid_common_attributes: raise InvalidFilterException("Some filter parameters appeared twice") qs.filters.update(kwargs_values) if not qs._predicate: qs._predicate = predicate elif predicate: old_predicate = qs._predicate qs._predicate = lambda m: old_predicate(m) and predicate(m) return qs
[docs] def order(self, *args, **kwargs): """Copies the current :class:`~sheraf.queryset.QuerySet` and adds more order to it. :param args: There can be only one positionnal argument. Choose to iterate over ids in an ascending or a descending way. :type args: ``sheraf.ASC`` or ``sheraf.DESC`` :param kwargs: Further parameters will set an order on the matching model attributes. :type kwargs: A dictionary which keys must be valid attributes of the model iterated, and the values must be ``sheraf.ASC`` or ``sheraf.DESC`` :return: A copy of the current :class:`~sheraf.queryset.QuerySet` with refined order. :return type: :class:`~sheraf.queryset.QuerySet` The default order is the ascending model ids. >>> with sheraf.connection(commit=True): ... peter = Cowboy.create(name="Peter", age=35) ... steven = Cowboy.create(name="Steven", age=35) ... george = Cowboy.create(name="George", age=50) ... >>> with sheraf.connection(): ... assert [peter, steven, george] == Cowboy.all() ... assert [peter, steven, george] == Cowboy.all().order(sheraf.ASC) ... assert [george, steven, peter] == Cowboy.all().order(sheraf.DESC) ... ... assert [george, peter, steven] == Cowboy.all().order(name=sheraf.ASC) ... assert [steven, peter, george] == Cowboy.all().order(name=sheraf.DESC) Several order parameters can be passed, either as arguments of the function, or by calling :func:`~sheraf.queryset.QuerySet.order` calls. >>> with sheraf.connection(): ... assert [george, peter, steven] == Cowboy.all().order(age=sheraf.DESC, name=sheraf.ASC) .. note:: Sorting on indexed attributes is more performant than sorting on other attributes. See :func:`~sheraf.attributes.Attribute.index`. The less :func:`~sheraf.queryset.QuerySet.order` parameters are passed, the better performances will be. """ if not self.model and (args or not kwargs): raise InvalidOrderException( "QuerySets without models should have explict order arguments" ) if len(args) > 1: raise InvalidOrderException( "Only one 'order' positionnal parameter is allowed." ) qs = self.copy() if self.model: for attribute, value in kwargs.items(): if attribute not in self.model.attributes: raise sheraf.exceptions.InvalidOrderException( f"{self.model.__name__} has no attribute {attribute}" ) if value not in (sheraf.constants.ASC, sheraf.constants.DESC): raise sheraf.exceptions.InvalidOrderException( "Parameter {} has an invalid order value {}".format( attribute, value ) ) if len(args) > 0: identifier = args[0] if identifier not in (sheraf.constants.ASC, sheraf.constants.DESC): raise InvalidOrderException( f"Parameter id has an invalid order value {identifier}" ) if self.primary_key in qs.orders: raise InvalidOrderException("Id order has been set twice") qs.orders[self.primary_key] = identifier common_attributes = set(qs.orders) & set(kwargs) if common_attributes: raise InvalidOrderException("Some order parameters appeared twice") qs.orders.update(kwargs) return qs
[docs] def get(self): """If the :class:`~sheraf.queryset.QuerySet` contains one, and only one item, this method returns the item. If the :class:`~sheraf.queryset.QuerySet` contains several objects, it raises a :class:`~sheraf.exceptions.QuerySetUnpackException`. If the :class:`~sheraf.queryset.QuerySet` is empty, it raises a :class:`~sheraf.exceptions.EmptyQuerySetUnpackException`. >>> with sheraf.connection(): ... peter = Cowboy.create(name="Peter") ... steven = Cowboy.create(name="Steven") ... assert peter == Cowboy.filter(name="Peter").get() ... Cowboy.all().get() Traceback (most recent call last): ... sheraf.exceptions.QuerySetUnpackException: Trying to unpack more than 1 value from a QuerySet >>> with sheraf.connection(): ... Cowboy.filter(age=30).get() Traceback (most recent call last): ... sheraf.exceptions.EmptyQuerySetUnpackException: Trying to unpack an empty QuerySet >>> with sheraf.connection(): ... Cowboy.filter(name="Unknown cowboy").get() Traceback (most recent call last): ... sheraf.exceptions.EmptyQuerySetUnpackException: Trying to unpack an empty QuerySet """ try: element = next(self) except StopIteration: raise sheraf.exceptions.EmptyQuerySetUnpackException() try: next(self) except StopIteration: return element else: raise sheraf.exceptions.QuerySetUnpackException()