| import asyncio
|
| import base64
|
| import os
|
| import random
|
| import logging
|
| import functools
|
| from typing import Any, Callable, Optional, Tuple, Union, TypeVar, Awaitable
|
|
|
| from playwright._impl._errors import Error as PlaywrightError
|
| from playwright._impl._errors import TimeoutError, TargetClosedError
|
| from playwright.async_api import Download, Page
|
| from playwright.async_api import TimeoutError as PlaywrightTimeoutError
|
|
|
|
|
|
|
|
|
| CUA_KEY_TO_PLAYWRIGHT_KEY = {
|
| "/": "Divide",
|
| "\\": "Backslash",
|
| "alt": "Alt",
|
| "arrowdown": "ArrowDown",
|
| "arrowleft": "ArrowLeft",
|
| "arrowright": "ArrowRight",
|
| "arrowup": "ArrowUp",
|
| "backspace": "Backspace",
|
| "capslock": "CapsLock",
|
| "cmd": "Meta",
|
| "ctrl": "Control",
|
| "delete": "Delete",
|
| "end": "End",
|
| "enter": "Enter",
|
| "esc": "Escape",
|
| "home": "Home",
|
| "insert": "Insert",
|
| "option": "Alt",
|
| "pagedown": "PageDown",
|
| "pageup": "PageUp",
|
| "shift": "Shift",
|
| "space": " ",
|
| "super": "Meta",
|
| "tab": "Tab",
|
| "win": "Meta",
|
| }
|
|
|
| F = TypeVar("F", bound=Callable[..., Awaitable[Any]])
|
|
|
|
|
| def handle_target_closed(max_retries: int = 2, timeout_secs: int = 30):
|
| """
|
| Decorator to handle TargetClosedError and tunnel connection errors by attempting to recover the page.
|
|
|
| Args:
|
| max_retries: Maximum number of retry attempts
|
| timeout_secs: Timeout for page operations during recovery
|
| """
|
|
|
| def decorator(func: F) -> F:
|
| @functools.wraps(func)
|
| async def wrapper(*args, **kwargs):
|
|
|
| logger = args[0].logger
|
| page = None
|
| if len(args) >= 2 and hasattr(
|
| args[1], "url"
|
| ):
|
| page = args[1]
|
|
|
| retries = 0
|
| last_error = None
|
|
|
| while retries <= max_retries:
|
| try:
|
| return await func(*args, **kwargs)
|
| except (TargetClosedError, PlaywrightError) as e:
|
|
|
| is_tunnel_error = "net::ERR_TUNNEL_CONNECTION_FAILED" in str(e)
|
| is_target_closed = isinstance(
|
| e, TargetClosedError
|
| ) or "Target page, context or browser has been closed" in str(e)
|
|
|
| if not (is_tunnel_error or is_target_closed):
|
|
|
| raise e
|
|
|
| last_error = e
|
| retries += 1
|
|
|
| if retries > max_retries:
|
| raise e
|
|
|
| if page is None:
|
|
|
| raise e
|
|
|
| error_type = (
|
| "tunnel connection" if is_tunnel_error else "target closed"
|
| )
|
| logger.warning(
|
| f"{error_type} error in {func.__name__}, attempting recovery (retry {retries}/{max_retries})"
|
| )
|
|
|
| try:
|
|
|
| await _recover_page(page, timeout_secs, logger)
|
|
|
| await asyncio.sleep(0.5)
|
| except Exception as recovery_error:
|
| logger.error(f"Page recovery failed: {recovery_error}")
|
|
|
| raise e from recovery_error
|
|
|
|
|
| raise last_error
|
|
|
| return wrapper
|
|
|
| return decorator
|
|
|
|
|
| async def _recover_page(page: Page, timeout_secs: int = 30, logger=None) -> None:
|
| """
|
| Attempt to recover a closed page by reloading it.
|
|
|
| Args:
|
| page: The Playwright page object to recover
|
| timeout_secs: Timeout for recovery operations
|
| """
|
| logger = logger or logging.getLogger("playwright_controller")
|
| try:
|
|
|
| await page.evaluate("1", timeout=1000)
|
|
|
| return
|
| except Exception:
|
|
|
| pass
|
|
|
| try:
|
|
|
| await page.evaluate("window.stop()", timeout=2000)
|
| except Exception:
|
|
|
| pass
|
|
|
| try:
|
|
|
| await page.reload(timeout=timeout_secs * 1000)
|
| await page.wait_for_load_state("load", timeout=timeout_secs * 1000)
|
| logger.info("playwright_controller._recover_page(): Page recovery successful")
|
| except Exception as e:
|
| logger.error(f"playwright_controller._recover_page(): Page reload failed: {e}")
|
|
|
|
|
| try:
|
| current_url = page.url
|
| if current_url and current_url != "about:blank":
|
| await page.goto(current_url, timeout=timeout_secs * 1000)
|
| await page.wait_for_load_state("load", timeout=timeout_secs * 1000)
|
| logger.info(
|
| "playwright_controller._recover_page(): Page recovery via goto successful"
|
| )
|
| else:
|
| raise Exception(
|
| "playwright_controller._recover_page(): No valid URL to navigate to"
|
| )
|
| except Exception as goto_error:
|
| raise Exception(
|
| f"playwright_controller._recover_page(): All recovery methods failed. Reload error: {e}, Goto error: {goto_error}"
|
| )
|
|
|
|
|
|
|
| def handle_target_closed_with_context(max_retries: int = 2, timeout_secs: int = 30):
|
| """
|
| Enhanced decorator that can also handle browser context recreation.
|
| Use this for critical operations where you have access to the browser context.
|
| """
|
|
|
| def decorator(func: F) -> F:
|
| @functools.wraps(func)
|
| async def wrapper(*args, **kwargs):
|
| logger = args[0].logger
|
| page = None
|
| if len(args) >= 2 and hasattr(args[1], "url"):
|
| page = args[1]
|
|
|
| retries = 0
|
| last_error = None
|
|
|
| while retries <= max_retries:
|
| try:
|
| return await func(*args, **kwargs)
|
| except (TargetClosedError, PlaywrightError) as e:
|
|
|
| is_tunnel_error = "net::ERR_TUNNEL_CONNECTION_FAILED" in str(e)
|
| is_target_closed = isinstance(
|
| e, TargetClosedError
|
| ) or "Target page, context or browser has been closed" in str(e)
|
|
|
| if not (is_tunnel_error or is_target_closed):
|
|
|
| raise e
|
|
|
| last_error = e
|
| retries += 1
|
|
|
| if retries > max_retries:
|
| raise e
|
|
|
| if page is None:
|
| raise e
|
|
|
| error_type = (
|
| "tunnel connection" if is_tunnel_error else "target closed"
|
| )
|
| logger.warning(
|
| f"playwright_controller.handle_target_closed_with_context(): {error_type} error in {func.__name__}, attempting enhanced recovery (retry {retries}/{max_retries})"
|
| )
|
|
|
| try:
|
|
|
| context = page.context
|
| browser = context.browser
|
|
|
| if browser and not browser.is_connected():
|
|
|
| logger.error(
|
| "playwright_controller.handle_target_closed_with_context(): Browser connection lost - cannot recover automatically"
|
| )
|
| raise e
|
|
|
|
|
| await _recover_page(page, timeout_secs)
|
| await asyncio.sleep(0.5)
|
|
|
| except Exception as recovery_error:
|
| logger.error(
|
| f"playwright_controller.handle_target_closed_with_context(): Enhanced page recovery failed: {recovery_error}"
|
| )
|
| raise e from recovery_error
|
|
|
| raise last_error
|
|
|
| return wrapper
|
|
|
| return decorator
|
|
|
|
|
| class PlaywrightController:
|
| def __init__(
|
| self,
|
| animate_actions: bool = False,
|
| downloads_folder: Optional[str] = None,
|
| viewport_width: int = 1440,
|
| viewport_height: int = 900,
|
| _download_handler: Optional[Callable[[Download], None]] = None,
|
| to_resize_viewport: bool = True,
|
| single_tab_mode: bool = False,
|
| sleep_after_action: int = 10,
|
| timeout_load: int = 1,
|
| logger=None,
|
| ) -> None:
|
| """
|
| A controller for Playwright to interact with web pages.
|
| animate_actions: If True, actions will be animated.
|
| downloads_folder: The folder to save downloads to.
|
| viewport_width: The width of the viewport.
|
| viewport_height: The height of the viewport.
|
| _download_handler: A handler for downloads.
|
| to_resize_viewport: If True, the viewport will be resized.
|
| single_tab_mode (bool): If True, forces navigation to happen in the same tab rather than opening new tabs/windows.
|
|
|
| """
|
| self.animate_actions = animate_actions
|
| self.downloads_folder = downloads_folder
|
| self.viewport_width = viewport_width
|
| self.viewport_height = viewport_height
|
| self._download_handler = _download_handler
|
| self.to_resize_viewport = to_resize_viewport
|
| self.single_tab_mode = single_tab_mode
|
| self._sleep_after_action = sleep_after_action
|
| self._timeout_load = timeout_load
|
| self.logger = logger or logging.getLogger("playwright_controller")
|
|
|
|
|
| self.last_cursor_position: Tuple[float, float] = (0.0, 0.0)
|
|
|
| async def sleep(self, page: Page, duration: Union[int, float]) -> None:
|
| await asyncio.sleep(duration)
|
|
|
| @handle_target_closed()
|
| async def on_new_page(self, page: Page) -> None:
|
| assert page is not None
|
|
|
| await page.bring_to_front()
|
| page.on("download", self._download_handler)
|
| if self.to_resize_viewport and self.viewport_width and self.viewport_height:
|
| await page.set_viewport_size(
|
| {"width": self.viewport_width, "height": self.viewport_height}
|
| )
|
| await self.sleep(page, 0.2)
|
| try:
|
| await page.wait_for_load_state(timeout=30000)
|
| except PlaywrightTimeoutError:
|
| self.logger.error("WARNING: Page load timeout, page might not be loaded")
|
|
|
| await page.evaluate("window.stop()")
|
|
|
| @handle_target_closed()
|
| async def _ensure_page_ready(self, page: Page) -> None:
|
| assert page is not None
|
| await self.on_new_page(page)
|
|
|
| @handle_target_closed()
|
| async def get_screenshot(self, page: Page, path: str | None = None) -> bytes:
|
| """
|
| Capture a screenshot of the current page.
|
|
|
| Args:
|
| page (Page): The Playwright page object.
|
| path (str, optional): The file path to save the screenshot. If None, the screenshot will be returned as bytes. Default: None
|
| """
|
| await self._ensure_page_ready(page)
|
| try:
|
| screenshot = await page.screenshot(path=path, timeout=15000)
|
| return screenshot
|
| except Exception:
|
| await page.evaluate("window.stop()")
|
|
|
| screenshot = await page.screenshot(path=path, timeout=15000)
|
| return screenshot
|
|
|
| @handle_target_closed()
|
| async def back(self, page: Page) -> None:
|
| await self._ensure_page_ready(page)
|
| await page.go_back()
|
|
|
| @handle_target_closed()
|
| async def visit_page(self, page: Page, url: str) -> Tuple[bool, bool]:
|
| await self._ensure_page_ready(page)
|
| reset_prior_metadata_hash = False
|
| reset_last_download = False
|
| try:
|
|
|
| await page.goto(url)
|
| await page.wait_for_load_state()
|
| reset_prior_metadata_hash = True
|
| except Exception as e_outer:
|
|
|
| if self.downloads_folder and "net::ERR_ABORTED" in str(e_outer):
|
| async with page.expect_download() as download_info:
|
| try:
|
| await page.goto(url)
|
| except Exception as e_inner:
|
| if "net::ERR_ABORTED" in str(e_inner):
|
| pass
|
| else:
|
| raise e_inner
|
| download = await download_info.value
|
| fname = os.path.join(
|
| self.downloads_folder, download.suggested_filename
|
| )
|
| await download.save_as(fname)
|
| message = f"<body style=\"margin: 20px;\"><h1>Successfully downloaded '{download.suggested_filename}' to local path:<br><br>{fname}</h1></body>"
|
| await page.goto(
|
| "data:text/html;base64,"
|
| + base64.b64encode(message.encode("utf-8")).decode("utf-8")
|
| )
|
| reset_last_download = True
|
| else:
|
| raise e_outer
|
| return reset_prior_metadata_hash, reset_last_download
|
|
|
| @handle_target_closed()
|
| async def page_down(
|
| self, page: Page, amount: int = 400, full_page: bool = False
|
| ) -> None:
|
| await self._ensure_page_ready(page)
|
| if full_page:
|
| await page.mouse.wheel(0, self.viewport_height - 50)
|
| else:
|
| await page.mouse.wheel(0, amount)
|
|
|
| @handle_target_closed()
|
| async def page_up(
|
| self, page: Page, amount: int = 400, full_page: bool = False
|
| ) -> None:
|
| await self._ensure_page_ready(page)
|
| if full_page:
|
| await page.mouse.wheel(0, -self.viewport_height + 50)
|
| else:
|
| await page.mouse.wheel(0, -amount)
|
|
|
| async def gradual_cursor_animation(
|
| self, page: Page, start_x: float, start_y: float, end_x: float, end_y: float
|
| ) -> None:
|
|
|
|
|
| await page.evaluate("""
|
| (function() {
|
| if (!document.getElementById('red-cursor')) {
|
| let cursor = document.createElement('div');
|
| cursor.id = 'red-cursor';
|
| cursor.style.width = '10px';
|
| cursor.style.height = '10px';
|
| cursor.style.backgroundColor = 'red';
|
| cursor.style.position = 'absolute';
|
| cursor.style.borderRadius = '50%';
|
| cursor.style.zIndex = '10000';
|
| document.body.appendChild(cursor);
|
| }
|
| })();
|
| """)
|
|
|
| steps = 20
|
| for step in range(steps):
|
| x = start_x + (end_x - start_x) * (step / steps)
|
| y = start_y + (end_y - start_y) * (step / steps)
|
|
|
| await page.evaluate(f"""
|
| (function() {{
|
| let cursor = document.getElementById('red-cursor');
|
| if (cursor) {{
|
| cursor.style.left = '{x}px';
|
| cursor.style.top = '{y}px';
|
| }}
|
| }})();
|
| """)
|
| await asyncio.sleep(0.05)
|
|
|
| self.last_cursor_position = (end_x, end_y)
|
| await asyncio.sleep(1.0)
|
|
|
| @handle_target_closed()
|
| async def click_coords(self, page: Page, x: float, y: float) -> None:
|
| new_page: Page | None = None
|
| await self._ensure_page_ready(page)
|
|
|
| if self.animate_actions:
|
|
|
| start_x, start_y = self.last_cursor_position
|
| await self.gradual_cursor_animation(page, start_x, start_y, x, y)
|
| await asyncio.sleep(0.1)
|
|
|
| try:
|
|
|
| async with page.expect_event("popup", timeout=1000) as page_info:
|
| await page.mouse.click(x, y, delay=10)
|
| new_page = await page_info.value
|
| assert isinstance(new_page, Page)
|
| await self.on_new_page(new_page)
|
| except TimeoutError:
|
| pass
|
| else:
|
| try:
|
|
|
| async with page.expect_event("popup", timeout=1000) as page_info:
|
| await page.mouse.click(x, y, delay=10)
|
| new_page = await page_info.value
|
| assert isinstance(new_page, Page)
|
| await self.on_new_page(new_page)
|
| except TimeoutError:
|
| pass
|
| return new_page
|
|
|
| @handle_target_closed()
|
| async def hover_coords(self, page: Page, x: float, y: float) -> None:
|
| """
|
| Hovers the mouse over the specified coordinates.
|
|
|
| Args:
|
| page (Page): The Playwright page object.
|
| x (float): The x coordinate to hover over.
|
| y (float): The y coordinate to hover over.
|
| """
|
| await self._ensure_page_ready(page)
|
|
|
| if self.animate_actions:
|
|
|
| start_x, start_y = self.last_cursor_position
|
| await self.gradual_cursor_animation(page, start_x, start_y, x, y)
|
| await asyncio.sleep(0.1)
|
|
|
| await page.mouse.move(x, y)
|
|
|
| @handle_target_closed()
|
| async def fill_coords(
|
| self,
|
| page: Page,
|
| x: float,
|
| y: float,
|
| value: str,
|
| press_enter: bool = True,
|
| delete_existing_text: bool = False,
|
| ) -> None:
|
| await self._ensure_page_ready(page)
|
| new_page: Page | None = None
|
|
|
| if self.animate_actions:
|
|
|
| start_x, start_y = self.last_cursor_position
|
| await self.gradual_cursor_animation(page, start_x, start_y, x, y)
|
| await asyncio.sleep(0.1)
|
|
|
| await page.mouse.click(x, y)
|
|
|
| if delete_existing_text:
|
| await page.keyboard.press("ControlOrMeta+A")
|
| await page.keyboard.press("Backspace")
|
|
|
|
|
| if len(value) < 100:
|
| delay_typing_speed = 50 + 100 * random.random()
|
| else:
|
| delay_typing_speed = 10
|
|
|
| if self.animate_actions:
|
| try:
|
|
|
| async with page.expect_event("popup", timeout=1000) as page_info:
|
| try:
|
| await page.keyboard.type(value)
|
| except PlaywrightError:
|
| await page.keyboard.type(value, delay=delay_typing_speed)
|
| if press_enter:
|
| await page.keyboard.press("Enter")
|
| new_page = await page_info.value
|
| assert isinstance(new_page, Page)
|
| await self.on_new_page(new_page)
|
| except TimeoutError:
|
| pass
|
| else:
|
| try:
|
|
|
| async with page.expect_event("popup", timeout=1000) as page_info:
|
| try:
|
| await page.keyboard.type(value)
|
| except PlaywrightError:
|
| await page.keyboard.type(value, delay=delay_typing_speed)
|
| if press_enter:
|
| await page.keyboard.press("Enter")
|
| new_page = await page_info.value
|
| assert isinstance(new_page, Page)
|
| await self.on_new_page(new_page)
|
| except TimeoutError:
|
| pass
|
|
|
| return new_page
|
|
|
| async def keypress(self, page: Page, keys: list[str]) -> None:
|
| """
|
| Press specified keys in sequence.
|
|
|
| Args:
|
| page (Page): The Playwright page object
|
| keys (List[str]): List of keys to press
|
| """
|
| await self._ensure_page_ready(page)
|
| mapped_keys = [CUA_KEY_TO_PLAYWRIGHT_KEY.get(key.lower(), key) for key in keys]
|
| try:
|
| for key in mapped_keys:
|
| await page.keyboard.down(key)
|
| for key in reversed(mapped_keys):
|
| await page.keyboard.up(key)
|
| except Exception as e:
|
| raise RuntimeError(
|
| f"I tried to keypress(keys={keys}), but I got an error: {e}"
|
| ) from None
|
|
|
| @handle_target_closed()
|
| async def wait_for_load_state(
|
| self, page: Page, state: str = "load", timeout: Optional[int] = None
|
| ) -> None:
|
| """Wait for the page to reach a specific load state."""
|
| await page.wait_for_load_state(state, timeout=timeout)
|
|
|
| @handle_target_closed()
|
| async def get_page_url(self, page: Page) -> str:
|
| """Get the current page URL."""
|
| await self._ensure_page_ready(page)
|
| return page.url
|
|
|