forked from kolypto/py-mongosql
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathquery.py
153 lines (129 loc) · 5.43 KB
/
query.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
from sqlalchemy.orm import Query, Load, defaultload
from sqlalchemy.orm.query import QueryContext
from sqlalchemy.sql import func
from .model import MongoModel
class MongoQuery(object):
""" MongoDB-style queries """
@classmethod
def get_for(cls, model, *args, **kwargs):
""" Get MongoQuery for a model.
Attempts to use `mongoquery` property of the model
:param model: Model
:type model: mongosql.MongoSqlBase|sqlalchemy.ext.declarative.DeclarativeMeta
:rtype: MongoQuery
"""
try:
return model.mongoquery(*args, **kwargs)
except AttributeError:
return cls(MongoModel.get_for(model), *args, **kwargs)
def __init__(self, model, query, _as_relation=None):
""" Init a MongoDB-style query
:param model: MongoModel
:type model: mongosql.MongoModel
:param query: Query to work with
:type query: sqlalchemy.orm.Query
:param _as_relation: Parent relationship.
Internal argument used when working with deeper relations:
is used as initial path for defaultload(_as_relation).lazyload(...).
:type _as_relation: sqlalchemy.orm.relationships.RelationshipProperty
"""
assert isinstance(model, MongoModel)
assert isinstance(query, Query)
self._model = model
self._query = query
self._as_relation = defaultload(_as_relation) if _as_relation else Load(self._model.model)
self._no_joindefaults = False
def aggregate(self, agg_spec):
""" Select aggregated results """
a = self._model.aggregate(agg_spec)
if a:
self._query = self._query.with_entities(*a)
self._no_joindefaults = True # no relationships should be loaded
# When no model criteria is specified, like COUNT(*), SqlAlchemy won't set the FROM clause
# Thus, we need to explicitly set the `FROM` clause in these cases
if self._query.whereclause is None:
self._query = self._query.select_from(self._model.model)
return self
def project(self, projection):
""" Apply a projection to the query """
p = self._model.project(projection, as_relation=self._as_relation)
if self._model.model.__name__ == 'User':
assert 1
self._query = self._query.options(p)
return self
def sort(self, sort_spec):
""" Apply sorting to the query """
s = self._model.sort(sort_spec)
self._query = self._query.order_by(*s)
return self
def group(self, group_spec):
""" Apply grouping to the query """
g = self._model.group(group_spec)
self._query = self._query.group_by(*g)
return self
def filter(self, criteria):
""" Add criteria to the query """
c = self._model.filter(criteria)
self._query = self._query.filter(c)
return self
def limit(self, limit=None, skip=None):
""" Slice results """
limit, skip = self._model.limit(limit, skip)
if skip:
self._query = self._query.offset(skip)
if limit:
self._query = self._query.limit(limit)
return self
def join(self, relnames):
""" Eagerly load relations """
for mjp in self._model.join(relnames, as_relation=self._as_relation):
# Complex joins
if mjp.query:
self._query = self.get_for(
mjp.target_model,
self._query.join(mjp.relationship),
_as_relation=mjp.relationship
)\
.query(**mjp.query)\
.end()
# Options
self._query = self._query.options(*mjp.options)
self._query = self._query.with_labels()
self._no_joindefaults = True
return self
def count(self):
""" Count rows instead """
self._query = self._query.from_self(func.count(1))
self._no_joindefaults = True # no relationships should be loaded
return self
def query(self, project=None, sort=None, group=None, filter=None, skip=None, limit=None, join=None, aggregate=None, count=False, **__unk):
""" Build a query
:param project: Projection spec
:param sort: Sorting spec
:param group: Grouping spec
:param filter: Filter criteria
:param skip: Skip rows
:param limit: Limit rows
:param join: Eagerly load relations
:param aggregate: Select aggregated results
:param count: True to count rows instead
:raises AssertionError: unknown Query Object operations provided (extra keys)
:rtype: MongoQuery
"""
assert not __unk, 'Unknown Query Object operations: {}'.format(__unk.keys())
q = self
if join: q = q.join(join)
if project: q = q.project(project)
if aggregate: q = q.aggregate(aggregate)
if filter: q = q.filter(filter)
if sort: q = q.sort(sort)
if group: q = q.group(group)
if skip or limit: q = q.limit(limit, skip)
return q.count() if count else q
def end(self):
""" Get the Query object
:rtype: sqlalchemy.orm.Query
"""
if not self._no_joindefaults:
self.join(()) # have to join with an empty list explicitly so all relations get noload()
return self._query