forked from Stochastic-Adventure/RaydiumAPYScraper
-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathdriver.py
More file actions
39 lines (32 loc) · 1.2 KB
/
driver.py
File metadata and controls
39 lines (32 loc) · 1.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import os
import aiohttp
import asyncio
from aiolimiter import AsyncLimiter
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from LPInfo import RaydiumPoolInfo
from pprint import pprint
async def amain():
limiter = AsyncLimiter(15, 1)
task = []
async with aiohttp.ClientSession() as session:
async with limiter:
LP = RaydiumPoolInfo(session)
fee_apy_task = asyncio.create_task(LP.RAYDIUM.get_pair())
fee_apy = await fee_apy_task
for farm in LP.farms_info:
task.append(LP.get_APR(farm))
result = await asyncio.gather(*task)
for farm in result:
farm_name = list(farm.keys())[0]
farm[farm_name].update({"Fee_APR": fee_apy[farm_name]})
pprint(farm)
if __name__ == '__main__':
scheduler = AsyncIOScheduler()
scheduler.add_job(amain, 'cron', minute='*')
scheduler.start()
print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
# Execution will block here until Ctrl+C (Ctrl+Break on Windows) is pressed.
try:
asyncio.get_event_loop().run_forever()
except (KeyboardInterrupt, SystemExit):
pass