-
Notifications
You must be signed in to change notification settings - Fork 151
/
Copy pathtest_loader.py
282 lines (229 loc) · 9.47 KB
/
test_loader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
import random
from datetime import datetime
import pytest
from async_generator import yield_, async_generator
from .models import db, User, Team, Company
pytestmark = pytest.mark.asyncio
@pytest.fixture
@async_generator
async def user(bind, random_name):
c = await Company.create()
t1 = await Team.create(company_id=c.id)
t2 = await Team.create(company_id=c.id, parent_id=t1.id)
u = await User.create(nickname=random_name, team_id=t2.id)
u.team = t2
t2.parent = t1
t2.company = c
t1.company = c
await yield_(u)
await User.delete.gino.status()
await Team.delete.gino.status()
await Company.delete.gino.status()
async def test_model_alternative(user):
u = await User.query.gino.load(User).first()
assert isinstance(u, User)
assert u.id == user.id
assert u.nickname == user.nickname
async def test_scalar(user):
name = await User.query.gino.load(User.nickname).first()
assert user.nickname == name
uid, name = await User.query.gino.load((User.id, User.nickname)).first()
assert user.id == uid
assert user.nickname == name
async def test_model_load(user):
u = await User.query.gino.load(User.load('nickname')).first()
assert isinstance(u, User)
assert u.id is None
assert u.nickname == user.nickname
async def test_216_model_load_passive_partial(user):
u = await db.select([User.nickname]).gino.model(User).first()
assert isinstance(u, User)
assert u.id is None
assert u.nickname == user.nickname
async def test_load_relationship(user):
u = await User.outerjoin(Team).select().gino.load(
User.load(team=Team)).first()
assert isinstance(u, User)
assert u.id == user.id
assert u.nickname == user.nickname
assert isinstance(u.team, Team)
assert u.team.id == user.team.id
assert u.team.name == user.team.name
async def test_load_nested(user):
for u in (
await User.outerjoin(Team).outerjoin(Company).select().gino.load(
User.load(team=Team.load(company=Company))).first(),
await User.load(team=Team.load(company=Company)).gino.first(),
await User.load(team=Team.load(company=Company.on(
Team.company_id == Company.id))).gino.first(),
await User.load(team=Team.load(company=Company).on(
User.team_id == Team.id)).gino.first(),
await User.load(team=Team.on(User.team_id == Team.id).load(
company=Company)).gino.first(),
):
assert isinstance(u, User)
assert u.id == user.id
assert u.nickname == user.nickname
assert isinstance(u.team, Team)
assert u.team.id == user.team.id
assert u.team.name == user.team.name
assert isinstance(u.team.company, Company)
assert u.team.company.id == user.team.company.id
assert u.team.company.name == user.team.company.name
async def test_func(user):
def loader(row, context):
rv = User(id=row[User.id], nickname=row[User.nickname])
rv.team = Team(id=row[Team.id], name=row[Team.name])
rv.team.company = Company(id=row[Company.id], name=row[Company.name])
return rv
u = await User.outerjoin(Team).outerjoin(Company).select().gino.load(
loader).first()
assert isinstance(u, User)
assert u.id == user.id
assert u.nickname == user.nickname
assert isinstance(u.team, Team)
assert u.team.id == user.team.id
assert u.team.name == user.team.name
assert isinstance(u.team.company, Company)
assert u.team.company.id == user.team.company.id
assert u.team.company.name == user.team.company.name
async def test_adjacency_list(user):
group = Team.alias()
with pytest.raises(AttributeError):
group.non_exist()
# noinspection PyUnusedLocal
def loader(row, context):
rv = User(id=row[User.id], nickname=row[User.nickname])
rv.team = Team(id=row[Team.id], name=row[Team.name])
rv.team.parent = Team(id=row[group.id], name=row[group.name])
return rv
for exp in (loader,
User.load(team=Team.load(parent=group)),
User.load(team=Team.load(parent=group.load('id', 'name'))),
User.load(team=Team.load(parent=group.load()))):
u = await User.outerjoin(
Team
).outerjoin(
group, Team.parent_id == group.id
).select().gino.load(exp).first()
assert isinstance(u, User)
assert u.id == user.id
assert u.nickname == user.nickname
assert isinstance(u.team, Team)
assert u.team.id == user.team.id
assert u.team.name == user.team.name
assert isinstance(u.team.parent, Team)
assert u.team.parent.id == user.team.parent.id
assert u.team.parent.name == user.team.parent.name
async def test_adjacency_list_query_builder(user):
group = Team.alias()
u = await User.load(team=Team.load(parent=group.on(
Team.parent_id == group.id))).gino.first()
assert isinstance(u, User)
assert u.id == user.id
assert u.nickname == user.nickname
assert isinstance(u.team, Team)
assert u.team.id == user.team.id
assert u.team.name == user.team.name
assert isinstance(u.team.parent, Team)
assert u.team.parent.id == user.team.parent.id
assert u.team.parent.name == user.team.parent.name
async def test_literal(user):
sample = tuple(random.random() for _ in range(5))
now = db.Column('time', db.DateTime())
row = await db.first(db.text(
'SELECT now() AT TIME ZONE \'UTC\''
).columns(now).gino.load(
sample + (lambda r, c: datetime.utcnow(), now,)).query)
assert row[:5] == sample
assert isinstance(row[-2], datetime)
assert isinstance(row[-1], datetime)
assert row[-1] <= row[-2]
async def test_load_one_to_many(user):
# noinspection PyListCreation
uids = [user.id]
uids.append((await User.create(nickname='1', team_id=user.team.id)).id)
uids.append((await User.create(nickname='1', team_id=user.team.id)).id)
uids.append((await User.create(nickname='2',
team_id=user.team.parent.id)).id)
query = User.outerjoin(Team).outerjoin(Company).select()
companies = await query.gino.load(
Company.distinct(Company.id).load(
add_team=Team.load(add_member=User).distinct(Team.id))).all()
assert len(companies) == 1
company = companies[0]
assert isinstance(company, Company)
assert company.id == user.team.company_id
assert company.name == user.team.company.name
assert len(company.teams) == 2
for team in company.teams:
if team.id == user.team.id:
assert len(team.members) == 3
for u in team.members:
if u.nickname == user.nickname:
assert isinstance(u, User)
assert u.id == user.id
uids.remove(u.id)
if u.nickname in {'1', '2'}:
uids.remove(u.id)
else:
assert len(team.members) == 1
uids.remove(list(team.members)[0].id)
assert uids == []
# test distinct many-to-one
query = User.outerjoin(Team).select().where(Team.id == user.team.id)
users = await query.gino.load(
User.load(team=Team.distinct(Team.id))).all()
assert len(users) == 3
assert users[0].team is users[1].team
assert users[0].team is users[2].team
async def test_distinct_none(bind):
u = await User.create()
query = User.outerjoin(Team).select().where(User.id == u.id)
loader = User.load(team=Team)
u = await query.gino.load(loader).first()
assert u.team.id is None
query = User.outerjoin(Team).select().where(User.id == u.id)
loader = User.load(team=Team.distinct(Team.id))
u = await query.gino.load(loader).first()
assert not hasattr(u, 'team')
async def test_distinct_tuple_loader(user):
from gino.loader import ColumnLoader
query = db.select([Company,
Team,
db.literal_column("'test'").label('test_column')],
use_labels=True) \
.select_from(Company.outerjoin(Team))
companies = await query.gino.load((
Company.distinct(Company.id).load(add_team=Team.distinct(Team.id)),
ColumnLoader(query.c.test_column)
)).all()
assert len(companies) == 1
assert len(companies[0][0].teams) == 2
assert companies[0][1] == 'test'
async def test_tuple_loader_279(user):
from gino.loader import TupleLoader
query = db.select([User, Team])
async with db.transaction():
async for row in query.gino.load((User, Team)).iterate():
assert len(row) == 2
async for row in query.gino.load(TupleLoader((User, Team))).iterate():
assert len(row) == 2
async def test_none_as_none_281(user):
import gino
if gino.__version__ < '0.9':
query = Team.outerjoin(User).select()
loader = Team, User.none_as_none()
assert any(row[1] is None
for row in await query.gino.load(loader).all())
loader = Team.distinct(Team.id).load(add_member=User.none_as_none())
assert any(not team.members
for team in await query.gino.load(loader).all())
if gino.__version__ >= '0.8.0':
query = Team.outerjoin(User).select()
loader = Team, User
assert any(row[1] is None
for row in await query.gino.load(loader).all())
loader = Team.distinct(Team.id).load(add_member=User)
assert any(not team.members
for team in await query.gino.load(loader).all())