-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsquadcast_client.py
152 lines (129 loc) · 3.46 KB
/
squadcast_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
from typing import Any, List, Union
from typing_extensions import Annotated
import httpx
import env
from pydantic import BaseModel, Discriminator, Tag as ATag
# use the default as eu.squadcast.com for eu tenancy
squadcast_tenancy = env.get("SQUADCAST_TENANCY", default="squadcast.com")
class Tag(BaseModel):
key: str
value: str
class Schedule(BaseModel):
ID: int
name: str
tags: List[Tag]
teamID: str
paused: bool
class User(BaseModel):
ID: str
name: str
firstName: str
lastName: str
email: str
class Squad(BaseModel):
ID: str
name: str
members: List[User]
def user_or_squad(data: Any) -> str:
if "members" in data:
return "squad"
return "user"
class Participant(BaseModel):
ID: str
type: str
participant: Annotated[
Union[
Annotated[User, ATag("user")],
Annotated[Squad, ATag("squad")],
],
Discriminator(user_or_squad),
]
class OncallSchedule(BaseModel):
schedule: Schedule
oncallParticipants: List[Participant]
def get_user_emails(self) -> List[str]:
emails = []
for op in self.oncallParticipants:
if isinstance(op.participant, User):
emails.append(op.participant.email)
else:
for mem in op.participant.members:
emails.append(mem.email)
return emails
class WhoIsOncall(BaseModel):
whoIsOncall: List[OncallSchedule]
class SquadcastClient(httpx.AsyncClient):
refresh_token: str
access_token: str
team_id: str
def __init__(self, refresh_token: str, team_id: str) -> None:
self.refresh_token = refresh_token
self.team_id = team_id
auth_resp = httpx.get(
f"https://auth.{squadcast_tenancy}/oauth/access-token",
headers={"X-Refresh-Token": refresh_token}
)
if auth_resp.status_code != 200:
raise Exception(f"fetch access token: expected 200, got {auth_resp.status_code}")
self.access_token = auth_resp.json()["data"]["access_token"]
super().__init__(auth=self.auth, base_url=f"https://api.{squadcast_tenancy}")
def auth(self, request: httpx.Request) -> httpx.Request:
request.headers["Authorization"] = f"Bearer {self.access_token}"
return request
async def get_schedules(self) -> (WhoIsOncall | Exception):
graphql_query = """query whoIsOncall($filters: WhoIsOncallFilters) {
whoIsOncall(filters: $filters) {
schedule {
ID
name
paused
tags {
key
value
}
teamID
}
oncallParticipants {
ID
type
participant {
... on User {
ID
name
firstName
lastName
email
}
... on Squad {
ID
name
members {
ID
name
firstName
lastName
email
}
}
}
}
}
}
"""
query = {
"query" : graphql_query,
"variables" : {
"filters" : {
"teamID": self.team_id
}
}
}
resp = (await self.post("/v3/graphql", json=query))
body = resp.json()
if resp.status_code != 200:
return Exception({
"status" : resp.status_code,
"error": body["meta"]["error_message"]
})
schresp = WhoIsOncall(**body["data"])
return schresp