-
Notifications
You must be signed in to change notification settings - Fork 11
/
vcmanager.py
326 lines (296 loc) · 10.9 KB
/
vcmanager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
import contextlib
from datetime import datetime
from telethon import functions
from telethon.errors import ChatAdminRequiredError, UserAlreadyInvitedError
from telethon.tl.types import Channel, Chat, User
from telethon.utils import get_display_name
from userbot import catub
from userbot.core.data import _sudousers_list, _vcusers_list
from userbot.core.managers import edit_delete, edit_or_reply
from userbot.helpers.utils import mentionuser
from userbot.sql_helper import global_collectionjson as sql
plugin_category = "extra"
async def get_group_call(chat):
if isinstance(chat, Channel):
result = await catub(functions.channels.GetFullChannelRequest(channel=chat))
elif isinstance(chat, Chat):
result = await catub(functions.messages.GetFullChatRequest(chat_id=chat.id))
return result.full_chat.call
async def chat_vc_checker(event, chat, edits=True):
if isinstance(chat, User):
await edit_delete(event, "Voice Chats are not available in Private Chats")
return None
result = await get_group_call(chat)
if not result:
if edits:
await edit_delete(event, "No Group Call in this chat")
return None
return result
async def parse_entity(entity):
if entity.isnumeric():
entity = int(entity)
return await catub.get_entity(entity)
@catub.cat_cmd(
pattern="vcstart",
command=("vcstart", plugin_category),
info={
"header": "To end a stream on Voice Chat.",
"description": "To end a stream on Voice Chat",
"usage": "{tr}vcstart",
"examples": "{tr}vcstart",
},
)
async def start_vc(event):
"To start a Voice Chat."
vc_chat = await catub.get_entity(event.chat_id)
gc_call = await chat_vc_checker(event, vc_chat, False)
if gc_call:
return await edit_delete(event, "Group Call is already available in this chat")
try:
await catub(
functions.phone.CreateGroupCallRequest(
peer=vc_chat,
title="Cat VC",
)
)
await edit_delete(event, "Started Group Call")
except ChatAdminRequiredError:
await edit_delete(event, "You should be chat admin to start vc", time=20)
@catub.cat_cmd(
pattern="vcend",
command=("vcend", plugin_category),
info={
"header": "To end a stream on Voice Chat.",
"description": "To end a stream on Voice Chat",
"usage": "{tr}vcend",
"examples": "{tr}vcend",
},
)
async def end_vc(event):
"To end a Voice Chat."
vc_chat = await catub.get_entity(event.chat_id)
gc_call = await chat_vc_checker(event, vc_chat)
if not gc_call:
return
try:
await catub(functions.phone.DiscardGroupCallRequest(call=gc_call))
await edit_delete(event, "Group Call Ended")
except ChatAdminRequiredError:
await edit_delete(event, "You should be chat admin to kill vc", time=20)
@catub.cat_cmd(
pattern="vcinv ?(.*)?",
command=("vcinv", plugin_category),
info={
"header": "To invite users on Voice Chat.",
"usage": "{tr}vcinv < userid/username or reply to user >",
"examples": [
"{tr}vcinv @angelpro",
"{tr}vcinv userid1 userid2",
],
},
)
async def inv_vc(event):
"To invite users to vc."
users = event.pattern_match.group(1)
reply = await event.get_reply_message()
vc_chat = await catub.get_entity(event.chat_id)
gc_call = await chat_vc_checker(event, vc_chat)
if not gc_call:
return
if not users:
if not reply:
return await edit_delete("Whom Should i invite")
users = reply.from_id
await edit_or_reply(event, "Inviting User to Group Call")
entities = str(users).split(" ")
user_list = []
for entity in entities:
cc = await parse_entity(entity)
if isinstance(cc, User):
user_list.append(cc)
try:
await catub(
functions.phone.InviteToGroupCallRequest(call=gc_call, users=user_list)
)
await edit_delete(event, "Invited users to Group Call")
except UserAlreadyInvitedError:
return await edit_delete(event, "User is Already Invited", time=20)
@catub.cat_cmd(
pattern="vcinfo",
command=("vcinfo", plugin_category),
info={
"header": "To get info of Voice Chat.",
"usage": "{tr}vcinfo",
"examples": "{tr}vcinfo",
},
)
async def info_vc(event):
"Get info of VC."
vc_chat = await catub.get_entity(event.chat_id)
gc_call = await chat_vc_checker(event, vc_chat)
if not gc_call:
return
await edit_or_reply(event, "Getting Group Call Info")
call_details = await catub(
functions.phone.GetGroupCallRequest(call=gc_call, limit=1)
)
grp_call = "**Group Call Info**\n\n"
grp_call += f"**Title :** {call_details.call.title}\n"
grp_call += f"**Participants Count :** {call_details.call.participants_count}\n\n"
if call_details.call.participants_count > 0:
grp_call += "**Participants**\n"
for user in call_details.users:
nam = f"{user.first_name or ''} {user.last_name or ''}"
grp_call += f" ● {mentionuser(nam,user.id)} - `{user.id}`\n"
await edit_or_reply(event, grp_call)
@catub.cat_cmd(
pattern="vctitle?(.*)?",
command=("vctitle", plugin_category),
info={
"header": "To end a stream on Voice Chat.",
"description": "To end a stream on Voice Chat",
"usage": "{tr}vctitle <text>",
"examples": "{tr}vctitle CatPro",
},
)
async def title_vc(event):
"To change vc title."
title = event.pattern_match.group(1)
vc_chat = await catub.get_entity(event.chat_id)
gc_call = await chat_vc_checker(event, vc_chat)
if not gc_call:
return
if not title:
return await edit_delete("What should i keep as title")
await catub(functions.phone.EditGroupCallTitleRequest(call=gc_call, title=title))
await edit_delete(event, f"VC title was changed to **{title}**")
@catub.cat_cmd(
pattern="vc(|un)mute ([\s\S]*)",
command=("vcmute", plugin_category),
info={
"header": "To mute users on Voice Chat.",
"description": "To mute a stream on Voice Chat",
"usage": [
"{tr}vcmute < userid/username or reply to user >",
],
"examples": [
"{tr}vcmute @angelpro",
"{tr}vcmute userid1 userid2",
],
},
)
async def mute_vc(event):
"To mute users in vc."
cmd = event.pattern_match.group(1)
users = event.pattern_match.group(2)
reply = await event.get_reply_message()
vc_chat = await catub.get_entity(event.chat_id)
gc_call = await chat_vc_checker(event, vc_chat)
if not gc_call:
return
check = "Unmute" if cmd else "Mute"
if not users:
if not reply:
return await edit_delete(f"Whom Should i {check}")
users = reply.from_id
await edit_or_reply(event, f"{check[:-1]}ing User in Group Call")
entities = str(users).split(" ")
user_list = []
for entity in entities:
cc = await parse_entity(entity)
if isinstance(cc, User):
user_list.append(cc)
for user in user_list:
await catub(
functions.phone.EditGroupCallParticipantRequest(
call=gc_call, participant=user, muted=not cmd
)
)
await edit_delete(event, f"{check}d users in Group Call")
@catub.cat_cmd(
command=("vcunmute", plugin_category),
info={
"header": "To unmute users on Voice Chat.",
"description": "To unmute a stream on Voice Chat",
"usage": [
"{tr}vcunmute < userid/username or reply to user>",
],
"examples": [
"{tr}vcunmute @angelpro",
"{tr}vcunmute userid1 userid2",
],
},
)
async def unmute_vc(event):
"To unmute users in vc."
@catub.cat_cmd(
pattern="(del|get|add)vcuser(?:\s|$)([\s\S]*)",
command=("vcuser", plugin_category),
info={
"header": "To add user as for vc .",
"usage": [
"{tr}addvcuser <username/reply/mention>",
"{tr}getvcuser",
"{tr}delvcuser <username/reply/mention>",
],
},
)
async def add_sudo_user(event):
"To add user to sudo."
vcusers = {}
vc_chats = _vcusers_list()
cmd = event.pattern_match.group(1)
with contextlib.suppress(AttributeError):
vcusers = sql.get_collection("vcusers_list").json
if cmd == "get":
if not vc_chats:
return await edit_delete(
event, "__There are no vc auth users for your Catuserbot.__"
)
result = "**The list of vc auth users for your Catuserbot are :**\n\n"
for chat in [*vcusers]:
result += f"☞ **Name:** {mentionuser(vcusers[str(chat)]['chat_name'],vcusers[str(chat)]['chat_id'])}\n"
result += f"**User Id :** `{chat}`\n"
username = f"@{vcusers[str(chat)]['chat_username']}" or "__None__"
result += f"**Username :** {username}\n"
result += f"Added on {vcusers[str(chat)]['date']}\n\n"
await edit_or_reply(event, result)
elif cmd in ["add", "del"]:
replied_user = event.pattern_match.group(2)
reply = await event.get_reply_message()
if not replied_user and reply:
replied_user = reply.from_id
if replied_user is None:
return
replied_user = await catub.get_entity(replied_user)
if not isinstance(replied_user, User):
return await edit_delete(event, "`Can't fetch the user...`")
date = str(datetime.now().strftime("%B %d, %Y"))
userdata = {
"chat_id": replied_user.id,
"chat_name": get_display_name(replied_user),
"chat_username": replied_user.username,
"date": date,
}
if cmd == "add":
if replied_user.id == event.client.uid:
return await edit_delete(event, "__You already have the access.__.")
elif replied_user.id in (vc_chats + _sudousers_list()):
return await edit_delete(
event,
f"{mentionuser(get_display_name(replied_user),replied_user.id)} __already have access .__",
)
vcusers[str(replied_user.id)] = userdata
elif cmd == "del":
if str(replied_user.id) not in vcusers:
return await edit_delete(
event,
f"{mentionuser(get_display_name(replied_user),replied_user.id)} __is not in your vc auth list__.",
)
del vcusers[str(replied_user.id)]
sql.del_collection("vcusers_list")
sql.add_collection("vcusers_list", vcusers, {})
output = f"{mentionuser(userdata['chat_name'],userdata['chat_id'])} __is {'Added to' if cmd =='add' else 'Deleted from'} your vc auth users.__\n"
output += "**Bot is reloading to apply the changes. Please wait for a minute**"
msg = await edit_or_reply(event, output)
await event.client.reload(msg)