-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathbot.py
184 lines (147 loc) · 6.31 KB
/
bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
import logging
import random
import threading
import traceback
from datetime import datetime, timedelta
from urllib.request import urlopen
import cv2
import numpy as np
import pytesseract
import requests
from dotenv import dotenv_values
from telegram import ParseMode, Update
from telegram.ext import CallbackContext, CommandHandler, Updater
SLOTS_URL = "https://app.checkvisaslots.com/slots/v1"
SCREENSHOTS_URL = "https://app.checkvisaslots.com/retrieve/v1"
SCREENSHOTS_FILE_URL = "https://cvs-all-files.s3.amazonaws.com"
API_KEYS = open("keys.txt").read().splitlines()
CHAT_ID = dotenv_values("config.env").get("CHAT_ID")
BOT_TOKEN = dotenv_values("config.env").get("BOT_TOKEN")
SEND_DUPES = eval(dotenv_values("config.env").get("SEND_DUPES"))
logging.basicConfig(format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO)
logger = logging.getLogger(__name__)
def get_slot_header(api_key):
return {
'authority': 'app.checkvisaslots.com',
'origin': 'chrome-extension://beepaenfejnphdgnkmccjcfiieihhogl',
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36',
'x-api-key': api_key,
}
def get_screenshots_header(api_key):
return {
"origin": "https://checkvisaslots.com",
"x-api-key": api_key,
}
def url_to_image(url):
resp = urlopen(url)
image = np.asarray(bytearray(resp.read()), dtype="uint8")
image = cv2.imdecode(image, cv2.IMREAD_COLOR)
return image
def image_to_bytes(img):
_, bts = cv2.imencode(".jpg", img)
bts = bts.tobytes()
return bts
def check_chennai_consulate(results):
return [[each["slots"], each["createdon"]] for each in results if each["visa_location"] == "CHENNAI"][0]
def get_chennai_screenshots(results):
screenshot_urls = [(each["img_url"], each["createdon"]) for each in results if "CHENNAI" in each["img_url"]]
screenshots = []
for url, timestamp in screenshot_urls:
screenshots.append([url_to_image(SCREENSHOTS_FILE_URL + url), timestamp])
return screenshots
def gmt_to_ist(time_in_gmt):
return time_in_gmt + timedelta(hours=5, minutes=30)
def run_once(bot, chat_id, old_log_text=None, old_log_pics=None, api_key=None, send_dupes=SEND_DUPES):
if old_log_pics is None:
old_log_pics = []
if api_key is None:
api_key = random.choice(API_KEYS)
response = requests.get(SLOTS_URL, headers=get_slot_header(api_key))
results = eval(response.text)
slots = check_chennai_consulate(results)
gmt_time = datetime.strptime(slots[1], "%a, %d %b %Y %H:%M:%S %Z")
ist_time = gmt_to_ist(gmt_time)
log_text = f'{slots[0]} slots available at {datetime.strftime(ist_time, "%I:%M:%S %p")}'
logger.info(log_text)
if send_dupes or log_text != old_log_text:
if slots[0] > 0:
bot.send_message(chat_id=chat_id, text=f"🤯🤯🤯 {log_text} ({api_key}) @Syzygianinfern0 @M_N_Sathish")
else:
bot.send_message(chat_id=chat_id, text=f"{log_text} ({api_key})")
log_pics = []
if slots[0] > 0:
response = requests.get(SCREENSHOTS_URL, headers=get_screenshots_header(api_key))
results = eval(response.text)
screenshots = get_chennai_screenshots(results)
for screenshot, timestamp in screenshots:
text = pytesseract.image_to_string(screenshot)
if "consular" in text.lower():
gmt_time = datetime.strptime(timestamp, "%a, %d %b %Y %H:%M:%S %Z")
ist_time = gmt_to_ist(gmt_time)
if send_dupes or not any(np.array_equal(screenshot, each) for each in old_log_pics):
bot.send_photo(
chat_id=chat_id,
photo=image_to_bytes(screenshot),
caption=datetime.strftime(ist_time, "%I:%M:%S %p") + " @Syzygianinfern0 @M_N_Sathish",
)
log_pics.append(screenshot)
return log_text, log_pics
def monitor(bot, chat_id, old_log_text=None, old_log_pics=None):
if old_log_pics is None:
old_log_pics = []
if old_log_text is None:
bot.send_message(chat_id=CHAT_ID, text="<b>Monitoring Started!</b>", parse_mode=ParseMode.HTML)
logger.info("Monitoring started!")
try:
log_text, log_pics = run_once(bot, chat_id, old_log_text, old_log_pics)
minutes = 5
except Exception as e:
logger.error(e)
bot.send_message(chat_id=CHAT_ID, text=f"Bot Crashed @Syzygianinfern0: {e}\n{traceback.format_exc()}")
log_text, log_pics = old_log_text, old_log_pics
minutes = 1
threading.Timer(
60 * minutes,
monitor,
kwargs={
"bot": bot,
"chat_id": chat_id,
"old_log_text": log_text,
"old_log_pics": log_pics,
},
).start()
def start_handler(update: Update, context: CallbackContext):
if str(update.effective_chat.id) == CHAT_ID:
context.bot.send_message(chat_id=update.effective_chat.id, text="Hello!")
else:
context.bot.send_message(chat_id=update.effective_chat.id, text="Unauthorized!")
def run_once_handler(update: Update, context: CallbackContext):
if str(update.effective_chat.id) == CHAT_ID:
try:
run_once(context.bot, update.effective_chat.id)
except Exception as e:
logger.error(e)
context.bot.send_message(
chat_id=update.effective_chat.id, text=f"Bot Crashed @Syzygianinfern0: {e}\n{traceback.format_exc()}"
)
else:
context.bot.send_message(chat_id=update.effective_chat.id, text="Unauthorized!")
def main():
# Start bot
updater = Updater(token=BOT_TOKEN, use_context=True)
dispatcher = updater.dispatcher
bot = updater.bot
# Welcome message
bot.send_message(
chat_id=CHAT_ID, text="<b>Bot Started!</b> @Syzygianinfern0 @M_N_Sathish", parse_mode=ParseMode.HTML
)
logger.info("Bot started!")
# Register command handlers
dispatcher.add_handler(CommandHandler("start", start_handler))
dispatcher.add_handler(CommandHandler("run", run_once_handler))
# Start monitoring service
threading.Thread(target=monitor, kwargs={"bot": bot, "chat_id": CHAT_ID}).start()
# Start bot handler services
updater.start_polling()
if __name__ == "__main__":
main()