Actions.json

(1) 物件偵測到"重試",信心門檻必須>=0.4  5秒內不能重複動作(避免連續Action)

Action: 移動滑鼠至物件中間位置,然後按下滑鼠左鍵

(2)  物件偵測到"取消",信心門檻必須>=0.4   5秒內不能重複動作(避免連續Action)

dwell_ms:該物件必須連續出現停留畫面超過5秒(只要有一次沒偵測到會重新歸零)

Action: 移動滑鼠至物件中間位置,然後按下滑鼠左鍵

 

ActionEngine 負責讀取物件偵測結果及檢查使用者提供的規則庫,

當目前所有規則都滿足時,則產生一個新任務ActionTask

交給任務執行者ActionWorker, 它是一個background thread執行,

例如滑鼠移動,滑鼠自動點擊等自動化執行工作 

下面為兩個應用程式視窗,分別sid=0和1,對應hwnd=0x123, hwnd=0xABC

ActionTask建構子如下: 

def __init__(self, hwnd: int, bbox:Tuple[int, int, int, int], actions: List[Dict]):

actions例如下面 [{"move_to":"center"}, {"click","left"}] 表示移動滑鼠至(x1+x2)//2, (y1+y2)//2並按下滑鼠左鍵

  • ActionEngine is the policy brain.

           It decides whether to act based on your JSON rule ( class , min_conf , dwell_ms , cooldown_ms ). 

           If the gates pass, it creates an ActionTask and hands it to the worker.

  • ActionTask is the unit of work: which window (hwnd), which region (bbox in client coords), and the ordered list of actions to perform (move_to, click, key, sleep_ms).

  • ActionWorker is the executor.

          It runs in one background thread, pulls tasks from a Queue, translates client → screen coordinates (via WinIO),

          and performs the steps. Because it’s a single worker, actions are serialized and you avoid spawning many short-lived threads.

 

 

class ActionTask:
    """Unit of work for the action worker."""
    def __init__(self, hwnd: int, bbox: Tuple[int,int,int,int], actions: List[dict]):
        self.hwnd = hwnd
        self.bbox = bbox          # client coords (x1,y1,x2,y2)
        self.actions = actions    # [{"move_to":"center"}, {"click":"left"}, ...]

class ActionWorker:
    """Single background worker that executes UI actions from a queue."""
    def __init__(self, stop_evt: threading.Event, maxsize: int = 256):
        self.stop_evt = stop_evt
        self.q: queue.Queue[ActionTask] = queue.Queue(maxsize=maxsize)
        self.t = threading.Thread(target=self._run, daemon=True)
        self._started = False

    def start(self):
        if not self._started:
            self.t.start()
            self._started = True

    def shutdown(self):
        # Loop watches stop_evt; nothing else needed
        self.stop_evt.set()

    @staticmethod
    def _anchor_point(x1, y1, x2, y2, where: str):
        where = (where or "center").lower()
        if   where == "top_left":     return x1, y1
        elif where == "top_right":    return x2, y1
        elif where == "bottom_left":  return x1, y2
        elif where == "bottom_right": return x2, y2
        return int((x1 + x2) / 2), int((y1 + y2) / 2)

    def _exec_task(self, task: ActionTask):
        """Runs in the worker thread; safe to block here."""
        hwnd, (x1,y1,x2,y2), actions = task.hwnd, task.bbox, task.actions
        try:
            if hwnd:
                WinIO.focus_window(hwnd)

            ax, ay = self._anchor_point(x1, y1, x2, y2, "center")

            for step in actions or []:
                if "sleep_ms" in step:
                    time.sleep(max(0, int(step["sleep_ms"])) / 1000.0)
                    continue

                if "move_to" in step:
                    ax, ay = self._anchor_point(x1, y1, x2, y2, step.get("move_to", "center"))
                    if hwnd:
                        sx, sy = WinIO.client_to_screen(hwnd, ax, ay)
                    else:
                        sx, sy = ax, ay
                    WinIO.mouse_move(sx, sy)
                    continue

                if "click" in step:
                    btn = str(step["click"]).lower()
                    if btn == "left":
                        WinIO.mouse_click_left()
                    elif btn == "right" and hasattr(WinIO, "mouse_click_right"):
                        WinIO.mouse_click_right()
                    elif btn == "middle" and hasattr(WinIO, "mouse_click_middle"):
                        WinIO.mouse_click_middle()
                    continue

                if "key" in step:
                    WinIO.key_press(str(step["key"]))
                    continue

        except Exception as e:
            print(f"[Actions] worker error: {e}")

    def _run(self):
        while not self.stop_evt.is_set():
            try:
                task = self.q.get(timeout=0.05)
            except Empty:
                continue
            try:
                self._exec_task(task)
            finally:
                self.q.task_done()

    def submit(self, task: ActionTask, *, drop_if_full: bool = True) -> bool:
        """Enqueue an action. Returns True if accepted."""
        try:
            self.q.put(task, block=not drop_if_full, timeout=0 if drop_if_full else 0.5)
            return True
        except Exception:
            print("[Actions] queue full — dropping action")
            return False


class ActionEngine:
    """
    Matches detections against JSON rules and enqueues actions to ActionWorker.
    Enforces min_conf, cooldown_ms, and dwell_ms per (source_id, class).
    """
    def __init__(self, rules: List[dict], worker: ActionWorker):
        self.rules = rules or []
        self.worker = worker
        self.last_fire: Dict[tuple, int] = {}  # (sid, class) -> last_ts_ms

    def consider(
        self,
        sid: int,
        class_name: str,
        conf: float,
        bbox_client: Tuple[int,int,int,int],
        hwnd: int,
        now_ms: int,
        dwell_elapsed_ms: Optional[int] = None,
    ):
        for r in self.rules:
            if str(r.get("class", "")).strip() != class_name:
                continue

            # Confidence gate
            if conf < float(r.get("min_conf", 0.0)):
                continue

            # Dwell gate (optional)
            need_dwell = int(r.get("dwell_ms", 0))
            if need_dwell > 0:
                if dwell_elapsed_ms is None or dwell_elapsed_ms < need_dwell:
                    continue

            # Cooldown gate
            cooldown = int(r.get("cooldown_ms", 0))
            key = (sid, class_name)
            last = self.last_fire.get(key, 0)
            if cooldown > 0 and (now_ms - last) < cooldown:
                continue

            # Passed all gates → remember fire time and enqueue task
            self.last_fire[key] = now_ms
            actions = r.get("actions", [])
            if actions:
                self.worker.submit(ActionTask(hwnd, bbox_client, actions))
            return  # fire first matching rule only