diff --git a/src/utils/custom_mouse.py b/src/utils/custom_mouse.py index 04956beda..e2998ad20 100644 --- a/src/utils/custom_mouse.py +++ b/src/utils/custom_mouse.py @@ -1,6 +1,6 @@ # Mostly copied from: https://github.com/patrikoss/pyclick import mouse as _mouse -from mouse import _winmouse +from threading import Thread, Lock import pytweening import numpy as np import random @@ -183,15 +183,10 @@ def tweenPoints(self, points, tween, targetPoints): res += points[index], return res +mouse_move_thread = None +cancelled = False +mutex = Lock() class mouse: - @staticmethod - def sleep(duration, get_now=time.perf_counter): - time.sleep(duration) - # now = get_now() - # end = now + duration - # while now < end: - # now = get_now() - @staticmethod def _move_to(x, y, absolute=True, duration=0): """ @@ -216,18 +211,48 @@ def _move_to(x, y, absolute=True, duration=0): dy = y - start_y if dx == 0 and dy == 0: - mouse.sleep(duration) + time.sleep(duration) else: # 120 movements per second. # Round and keep float to ensure float division in Python 2 steps = max(1.0, float(int(duration * 120.0))) for i in range(int(steps)+1): mouse.move(start_x + dx*i/steps, start_y + dy*i/steps) - mouse.sleep(duration/steps) + time.sleep(duration/steps) else: - _winmouse.move_to(x, y) + _mouse._winmouse.move_to(x, y) + + @staticmethod + def move_async(points: list, duration: float): + delta = duration / len(points) + for point in points: + with mutex: + if cancelled: + break + _mouse.move(point[0], point[1], duration=delta) + + @staticmethod + def sync(): + global mouse_move_thread + if mouse_move_thread is not None: + mouse_move_thread.join() + mouse_move_thread = None + + @staticmethod + def stop(): + global mouse_move_thread, cancelled + if mouse_move_thread is not None: + with mutex: + cancelled = True + mouse_move_thread.join() + mouse_move_thread = None + cancelled = False + + @staticmethod + def move(x, y, absolute: bool = True, randomize: int | tuple[int, int] = 5, delay_factor: tuple[float, float] = [0.9, 1.1], is_async: bool = False): + global mouse_move_thread + mouse.stop() - def move(x, y, absolute: bool = True, randomize: int | tuple[int, int] = 5, delay_factor: tuple[float, float] = [0.4, 0.6]): from_point = _mouse.get_position() dist = math.dist((x, y), from_point) offsetBoundaryX = max(10, int(0.08 * dist)) @@ -252,10 +277,15 @@ def move(x, y, absolute: bool = True, randomize: int | tuple[int, int] = 5, dela human_curve = HumanCurve(from_point, (x, y), offsetBoundaryX=offsetBoundaryX, offsetBoundaryY=offsetBoundaryY, targetPoints=targetPoints) duration = min(0.5, max(0.05, dist * 0.0004) * random.uniform(delay_factor[0], delay_factor[1])) - delta = duration / len(human_curve.points) - for point in human_curve.points: - _mouse.move(point[0], point[1], duration=delta) + if is_async and duration > 0.04: + with mutex: + mouse_move_thread = Thread(target = mouse.move_async, args = (human_curve.points, duration)) + mouse_move_thread.start() + else: + delta = duration / len(human_curve.points) + for point in human_curve.points: + _mouse.move(point[0], point[1], duration=delta) @staticmethod def _is_clicking_safe():