Skip to content
This repository was archived by the owner on Jul 1, 2022. It is now read-only.
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 46 additions & 16 deletions src/utils/custom_mouse.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Mostly copied from: https://github.com/patrikoss/pyclick
import mouse as _mouse
from mouse import _winmouse
from threading import Thread, Lock
import pytweening
import numpy as np
import random
Expand Down Expand Up @@ -183,15 +183,10 @@ def tweenPoints(self, points, tween, targetPoints):
res += points[index],
return res

mouse_move_thread = None
cancelled = False
mutex = Lock()
class mouse:
@staticmethod
def sleep(duration, get_now=time.perf_counter):
time.sleep(duration)
# now = get_now()
# end = now + duration
# while now < end:
# now = get_now()

@staticmethod
def _move_to(x, y, absolute=True, duration=0):
"""
Expand All @@ -216,18 +211,48 @@ def _move_to(x, y, absolute=True, duration=0):
dy = y - start_y

if dx == 0 and dy == 0:
mouse.sleep(duration)
time.sleep(duration)
else:
# 120 movements per second.
# Round and keep float to ensure float division in Python 2
steps = max(1.0, float(int(duration * 120.0)))
for i in range(int(steps)+1):
mouse.move(start_x + dx*i/steps, start_y + dy*i/steps)
mouse.sleep(duration/steps)
time.sleep(duration/steps)
else:
_winmouse.move_to(x, y)
_mouse._winmouse.move_to(x, y)

@staticmethod
def move_async(points: list, duration: float):
delta = duration / len(points)
for point in points:
with mutex:
if cancelled:
break
_mouse.move(point[0], point[1], duration=delta)

@staticmethod
def sync():
global mouse_move_thread
if mouse_move_thread is not None:
mouse_move_thread.join()
mouse_move_thread = None

@staticmethod
def stop():
global mouse_move_thread, cancelled
if mouse_move_thread is not None:
with mutex:
cancelled = True
mouse_move_thread.join()
mouse_move_thread = None
cancelled = False

@staticmethod
def move(x, y, absolute: bool = True, randomize: int | tuple[int, int] = 5, delay_factor: tuple[float, float] = [0.9, 1.1], is_async: bool = False):
global mouse_move_thread
mouse.stop()

def move(x, y, absolute: bool = True, randomize: int | tuple[int, int] = 5, delay_factor: tuple[float, float] = [0.4, 0.6]):
from_point = _mouse.get_position()
dist = math.dist((x, y), from_point)
offsetBoundaryX = max(10, int(0.08 * dist))
Expand All @@ -252,10 +277,15 @@ def move(x, y, absolute: bool = True, randomize: int | tuple[int, int] = 5, dela
human_curve = HumanCurve(from_point, (x, y), offsetBoundaryX=offsetBoundaryX, offsetBoundaryY=offsetBoundaryY, targetPoints=targetPoints)

duration = min(0.5, max(0.05, dist * 0.0004) * random.uniform(delay_factor[0], delay_factor[1]))
delta = duration / len(human_curve.points)

for point in human_curve.points:
_mouse.move(point[0], point[1], duration=delta)
if is_async and duration > 0.04:
with mutex:
mouse_move_thread = Thread(target = mouse.move_async, args = (human_curve.points, duration))
mouse_move_thread.start()
else:
delta = duration / len(human_curve.points)
for point in human_curve.points:
_mouse.move(point[0], point[1], duration=delta)

@staticmethod
def _is_clicking_safe():
Expand Down