-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathheartbeat.py
82 lines (65 loc) · 2.26 KB
/
heartbeat.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# -*- coding:utf-8 -*-
"""
服务器心跳
Author: CyberQuant
Date: 2023/06/01
"""
import asyncio
from aed_quant.utils import tools
from aed_quant.utils import logger
from aed_quant.config import config
__all__ = ("heartbeat", )
class HeartBeat(object):
""" 心跳
"""
def __init__(self):
self._count = 0 # 心跳次数
self._interval = 1 # 服务心跳执行时间间隔(秒)
self._print_interval = config.heartbeat.get("interval", 0) # 心跳打印时间间隔(秒),0为不打印
self._tasks = {} # 跟随心跳执行的回调任务列表,由 self.register 注册 {task_id: {...}}
@property
def count(self):
return self._count
def ticker(self):
""" 启动心跳, 每秒执行一次
"""
self._count += 1
# 打印心跳次数
if self._print_interval > 0:
if self._count % self._print_interval == 0:
logger.info("do server heartbeat, count:", self._count, caller=self)
# 设置下一次心跳回调
asyncio.get_event_loop().call_later(self._interval, self.ticker)
# 执行任务回调
for task_id, task in self._tasks.items():
interval = task["interval"]
if self._count % interval != 0:
continue
func = task["func"]
args = task["args"]
kwargs = task["kwargs"]
kwargs["task_id"] = task_id
kwargs["heart_beat_count"] = self._count
asyncio.get_event_loop().create_task(func(*args, **kwargs))
def register(self, func, interval=1, *args, **kwargs):
""" 注册一个任务,在每次心跳的时候执行调用
@param func 心跳的时候执行的函数
@param interval 执行回调的时间间隔(秒)
@return task_id 任务id
"""
t = {
"func": func,
"interval": interval,
"args": args,
"kwargs": kwargs
}
task_id = tools.get_uuid1()
self._tasks[task_id] = t
return task_id
def unregister(self, task_id):
""" 注销一个任务
@param task_id 任务id
"""
if task_id in self._tasks:
self._tasks.pop(task_id)
heartbeat = HeartBeat()