Actions.json
(1) 物件偵測到"重試",信心門檻必須>=0.4 5秒內不能重複動作(避免連續Action)
Action: 移動滑鼠至物件中間位置,然後按下滑鼠左鍵
(2) 物件偵測到"取消",信心門檻必須>=0.4 5秒內不能重複動作(避免連續Action)
dwell_ms:該物件必須連續出現停留畫面超過5秒(只要有一次沒偵測到會重新歸零)
Action: 移動滑鼠至物件中間位置,然後按下滑鼠左鍵
ActionEngine 負責讀取物件偵測結果及檢查使用者提供的規則庫,
當目前所有規則都滿足時,則產生一個新任務ActionTask,
交給任務執行者ActionWorker,
它是一個background thread執行,
例如滑鼠移動,滑鼠自動點擊等自動化執行工作
下面為兩個應用程式視窗,分別sid=0和1,對應hwnd=0x123, hwnd=0xABC
ActionTask建構子如下:
def __init__(self, hwnd: int, bbox:Tuple[int, int, int, int], actions: List[Dict]):
actions例如下面 [{"move_to":"center"}, {"click","left"}] 表示移動滑鼠至(x1+x2)//2, (y1+y2)//2並按下滑鼠左鍵
It decides whether to act based on your JSON rule ( class , min_conf , dwell_ms , cooldown_ms ).
If the gates pass, it creates an ActionTask and hands it to the worker.
-
ActionTask is the unit of work: which window (hwnd), which region (bbox in client coords), and the ordered list of actions to perform (move_to, click, key, sleep_ms).
-
ActionWorker is the executor.
It runs in one background thread, pulls tasks from a Queue, translates client → screen coordinates (via WinIO),
and performs the steps. Because it’s a single worker, actions are serialized and you avoid spawning many short-lived threads.
class ActionTask:
"""Unit of work for the action worker."""
def __init__(self, hwnd: int, bbox: Tuple[int,int,int,int], actions: List[dict]):
self.hwnd = hwnd
self.bbox = bbox # client coords (x1,y1,x2,y2)
self.actions = actions # [{"move_to":"center"}, {"click":"left"}, ...]
class ActionWorker:
"""Single background worker that executes UI actions from a queue."""
def __init__(self, stop_evt: threading.Event, maxsize: int = 256):
self.stop_evt = stop_evt
self.q: queue.Queue[ActionTask] = queue.Queue(maxsize=maxsize)
self.t = threading.Thread(target=self._run, daemon=True)
self._started = False
def start(self):
if not self._started:
self.t.start()
self._started = True
def shutdown(self):
# Loop watches stop_evt; nothing else needed
self.stop_evt.set()
@staticmethod
def _anchor_point(x1, y1, x2, y2, where: str):
where = (where or "center").lower()
if where == "top_left": return x1, y1
elif where == "top_right": return x2, y1
elif where == "bottom_left": return x1, y2
elif where == "bottom_right": return x2, y2
return int((x1 + x2) / 2), int((y1 + y2) / 2)
def _exec_task(self, task: ActionTask):
"""Runs in the worker thread; safe to block here."""
hwnd, (x1,y1,x2,y2), actions = task.hwnd, task.bbox, task.actions
try:
if hwnd:
WinIO.focus_window(hwnd)
ax, ay = self._anchor_point(x1, y1, x2, y2, "center")
for step in actions or []:
if "sleep_ms" in step:
time.sleep(max(0, int(step["sleep_ms"])) / 1000.0)
continue
if "move_to" in step:
ax, ay = self._anchor_point(x1, y1, x2, y2, step.get("move_to", "center"))
if hwnd:
sx, sy = WinIO.client_to_screen(hwnd, ax, ay)
else:
sx, sy = ax, ay
WinIO.mouse_move(sx, sy)
continue
if "click" in step:
btn = str(step["click"]).lower()
if btn == "left":
WinIO.mouse_click_left()
elif btn == "right" and hasattr(WinIO, "mouse_click_right"):
WinIO.mouse_click_right()
elif btn == "middle" and hasattr(WinIO, "mouse_click_middle"):
WinIO.mouse_click_middle()
continue
if "key" in step:
WinIO.key_press(str(step["key"]))
continue
except Exception as e:
print(f"[Actions] worker error: {e}")
def _run(self):
while not self.stop_evt.is_set():
try:
task = self.q.get(timeout=0.05)
except Empty:
continue
try:
self._exec_task(task)
finally:
self.q.task_done()
def submit(self, task: ActionTask, *, drop_if_full: bool = True) -> bool:
"""Enqueue an action. Returns True if accepted."""
try:
self.q.put(task, block=not drop_if_full, timeout=0 if drop_if_full else 0.5)
return True
except Exception:
print("[Actions] queue full — dropping action")
return False
class ActionEngine:
"""
Matches detections against JSON rules and enqueues actions to ActionWorker.
Enforces min_conf, cooldown_ms, and dwell_ms per (source_id, class).
"""
def __init__(self, rules: List[dict], worker: ActionWorker):
self.rules = rules or []
self.worker = worker
self.last_fire: Dict[tuple, int] = {} # (sid, class) -> last_ts_ms
def consider(
self,
sid: int,
class_name: str,
conf: float,
bbox_client: Tuple[int,int,int,int],
hwnd: int,
now_ms: int,
dwell_elapsed_ms: Optional[int] = None,
):
for r in self.rules:
if str(r.get("class", "")).strip() != class_name:
continue
# Confidence gate
if conf < float(r.get("min_conf", 0.0)):
continue
# Dwell gate (optional)
need_dwell = int(r.get("dwell_ms", 0))
if need_dwell > 0:
if dwell_elapsed_ms is None or dwell_elapsed_ms < need_dwell:
continue
# Cooldown gate
cooldown = int(r.get("cooldown_ms", 0))
key = (sid, class_name)
last = self.last_fire.get(key, 0)
if cooldown > 0 and (now_ms - last) < cooldown:
continue
# Passed all gates → remember fire time and enqueue task
self.last_fire[key] = now_ms
actions = r.get("actions", [])
if actions:
self.worker.submit(ActionTask(hwnd, bbox_client, actions))
return # fire first matching rule only