first commit

This commit is contained in:
DigiJ
2026-03-13 12:56:43 -07:00
commit 159cf9fcfe
309 changed files with 64584 additions and 0 deletions

View File

@@ -0,0 +1,4 @@
from ._multimodal_web_surfer import MultimodalWebSurfer
from .playwright_controller import PlaywrightController
__all__ = ["MultimodalWebSurfer", "PlaywrightController"]

View File

@@ -0,0 +1,11 @@
from dataclasses import dataclass
from typing import Any, Dict
@dataclass
class WebSurferEvent:
source: str
message: str
url: str
action: str | None = None
arguments: Dict[str, Any] | None = None

View File

@@ -0,0 +1,988 @@
import asyncio
import base64
import hashlib
import io
import json
import logging
import os
import re
import sys
import time
import traceback
import warnings
from typing import (
Any,
AsyncGenerator,
Dict,
List,
Optional,
Sequence,
)
from urllib.parse import quote_plus
import aiofiles
import PIL.Image
from agentdhal_agentchat.agents import BaseChatAgent
from agentdhal_agentchat.base import Response
from agentdhal_agentchat.messages import BaseAgentEvent, BaseChatMessage, MultiModalMessage, TextMessage
from agentdhal_agentchat.utils import content_to_str, remove_images
from agentdhal_core import EVENT_LOGGER_NAME, CancellationToken, Component, ComponentModel, FunctionCall
from agentdhal_core import Image as AGImage
from agentdhal_core.models import (
AssistantMessage,
ChatCompletionClient,
LLMMessage,
ModelFamily,
RequestUsage,
SystemMessage,
UserMessage,
)
from PIL import Image
from playwright.async_api import BrowserContext, Download, Page, Playwright, async_playwright
from pydantic import BaseModel
from typing_extensions import Self
from ._events import WebSurferEvent
from ._prompts import (
WEB_SURFER_QA_PROMPT,
WEB_SURFER_QA_SYSTEM_MESSAGE,
WEB_SURFER_TOOL_PROMPT_MM,
WEB_SURFER_TOOL_PROMPT_TEXT,
)
from ._set_of_mark import add_set_of_mark
from ._tool_definitions import (
TOOL_CLICK,
TOOL_HISTORY_BACK,
TOOL_HOVER,
TOOL_READ_PAGE_AND_ANSWER,
TOOL_SCROLL_DOWN,
TOOL_SCROLL_UP,
TOOL_SLEEP,
TOOL_SUMMARIZE_PAGE,
TOOL_TYPE,
TOOL_VISIT_URL,
TOOL_WEB_SEARCH,
)
from ._types import InteractiveRegion, UserContent
from .playwright_controller import PlaywrightController
DEFAULT_CONTEXT_SIZE = 128000
class MultimodalWebSurferConfig(BaseModel):
name: str
model_client: ComponentModel
downloads_folder: str | None = None
description: str | None = None
debug_dir: str | None = None
headless: bool = True
start_page: str | None = "https://www.bing.com/"
animate_actions: bool = False
to_save_screenshots: bool = False
use_ocr: bool = False
browser_channel: str | None = None
browser_data_dir: str | None = None
to_resize_viewport: bool = True
class MultimodalWebSurfer(BaseChatAgent, Component[MultimodalWebSurferConfig]):
"""
MultimodalWebSurfer is a multimodal agent that acts as a web surfer that can search the web and visit web pages.
Installation:
.. code-block:: bash
pip install "agentdhal-ext[web-surfer]"
It launches a chromium browser and allows the playwright to interact with the web browser and can perform a variety of actions. The browser is launched on the first call to the agent and is reused for subsequent calls.
It must be used with a multimodal model client that supports function/tool calling, ideally GPT-4o currently.
When :meth:`on_messages` or :meth:`on_messages_stream` is called, the following occurs:
1) If this is the first call, the browser is initialized and the page is loaded. This is done in :meth:`_lazy_init`. The browser is only closed when :meth:`close` is called.
2) The method :meth:`_generate_reply` is called, which then creates the final response as below.
3) The agent takes a screenshot of the page, extracts the interactive elements, and prepares a set-of-mark screenshot with bounding boxes around the interactive elements.
4) The agent makes a call to the :attr:`model_client` with the SOM screenshot, history of messages, and the list of available tools.
- If the model returns a string, the agent returns the string as the final response.
- If the model returns a list of tool calls, the agent executes the tool calls with :meth:`_execute_tool` using :attr:`_playwright_controller`.
- The agent returns a final response which includes a screenshot of the page, page metadata, description of the action taken and the inner text of the webpage.
5) If at any point the agent encounters an error, it returns the error message as the final response.
.. note::
Please note that using the MultimodalWebSurfer involves interacting with a digital world designed for humans, which carries inherent risks.
Be aware that agents may occasionally attempt risky actions, such as recruiting humans for help or accepting cookie agreements without human involvement. Always ensure agents are monitored and operate within a controlled environment to prevent unintended consequences.
Moreover, be cautious that MultimodalWebSurfer may be susceptible to prompt injection attacks from webpages.
.. note::
On Windows, the event loop policy must be set to `WindowsProactorEventLoopPolicy` to avoid issues with subprocesses.
.. code-block:: python
import sys
import asyncio
if sys.platform == "win32":
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
Args:
name (str): The name of the agent.
model_client (ChatCompletionClient): The model client used by the agent. Must be multimodal and support function calling.
downloads_folder (str, optional): The folder where downloads are saved. Defaults to None, no downloads are saved.
description (str, optional): The description of the agent. Defaults to MultimodalWebSurfer.DEFAULT_DESCRIPTION.
debug_dir (str, optional): The directory where debug information is saved. Defaults to None.
headless (bool, optional): Whether the browser should be headless. Defaults to True.
start_page (str, optional): The start page for the browser. Defaults to MultimodalWebSurfer.DEFAULT_START_PAGE.
animate_actions (bool, optional): Whether to animate actions. Defaults to False.
to_save_screenshots (bool, optional): Whether to save screenshots. Defaults to False.
use_ocr (bool, optional): Whether to use OCR. Defaults to False.
browser_channel (str, optional): The browser channel. Defaults to None.
browser_data_dir (str, optional): The browser data directory. Defaults to None.
to_resize_viewport (bool, optional): Whether to resize the viewport. Defaults to True.
playwright (Playwright, optional): The playwright instance. Defaults to None.
context (BrowserContext, optional): The browser context. Defaults to None.
Example usage:
The following example demonstrates how to create a web surfing agent with
a model client and run it for multiple turns.
.. code-block:: python
import asyncio
from agentdhal_agentchat.ui import Console
from agentdhal_agentchat.teams import RoundRobinGroupChat
from agentdhal_extensions.models.openai import OpenAIChatCompletionClient
from agentdhal_extensions.agents.web_surfer import MultimodalWebSurfer
async def main() -> None:
# Define an agent
web_surfer_agent = MultimodalWebSurfer(
name="MultimodalWebSurfer",
model_client=OpenAIChatCompletionClient(model="gpt-4o-2024-08-06"),
)
# Define a team
agent_team = RoundRobinGroupChat([web_surfer_agent], max_turns=3)
# Run the team and stream messages to the console
stream = agent_team.run_stream(task="Navigate to the AutoGen readme on GitHub.")
await Console(stream)
# Close the browser controlled by the agent
await web_surfer_agent.close()
asyncio.run(main())
"""
component_type = "agent"
component_config_schema = MultimodalWebSurferConfig
component_provider_override = "agentdhal_extensions.agents.web_surfer.MultimodalWebSurfer"
DEFAULT_DESCRIPTION = """
A helpful assistant with access to a web browser.
Ask them to perform web searches, open pages, and interact with content (e.g., clicking links, scrolling the viewport, filling in form fields, etc.).
It can also summarize the entire page, or answer questions based on the content of the page.
It can also be asked to sleep and wait for pages to load, in cases where the page seems not yet fully loaded.
"""
DEFAULT_START_PAGE = "https://www.bing.com/"
# Viewport dimensions
VIEWPORT_HEIGHT = 900
VIEWPORT_WIDTH = 1440
# Size of the image we send to the MLM
# Current values represent a 0.85 scaling to fit within the GPT-4v short-edge constraints (768px)
MLM_HEIGHT = 765
MLM_WIDTH = 1224
SCREENSHOT_TOKENS = 1105
def __init__(
self,
name: str,
model_client: ChatCompletionClient,
downloads_folder: str | None = None,
description: str = DEFAULT_DESCRIPTION,
debug_dir: str | None = None,
headless: bool = True,
start_page: str | None = DEFAULT_START_PAGE,
animate_actions: bool = False,
to_save_screenshots: bool = False,
use_ocr: bool = False,
browser_channel: str | None = None,
browser_data_dir: str | None = None,
to_resize_viewport: bool = True,
playwright: Playwright | None = None,
context: BrowserContext | None = None,
):
"""
Initialize the MultimodalWebSurfer.
"""
super().__init__(name, description)
if debug_dir is None and to_save_screenshots:
raise ValueError(
"Cannot save screenshots without a debug directory. Set it using the 'debug_dir' parameter. The debug directory is created if it does not exist."
)
if model_client.model_info["function_calling"] is False:
raise ValueError(
"The model does not support function calling. MultimodalWebSurfer requires a model that supports function calling."
)
self._model_client = model_client
self.headless = headless
self.browser_channel = browser_channel
self.browser_data_dir = browser_data_dir
self.start_page = start_page or self.DEFAULT_START_PAGE
self.downloads_folder = downloads_folder
self.debug_dir = debug_dir
self.to_save_screenshots = to_save_screenshots
self.use_ocr = use_ocr
self.to_resize_viewport = to_resize_viewport
self.animate_actions = animate_actions
# Call init to set these in case not set
self._playwright: Playwright | None = playwright
self._context: BrowserContext | None = context
self._page: Page | None = None
self._last_download: Download | None = None
self._prior_metadata_hash: str | None = None
self.logger = logging.getLogger(EVENT_LOGGER_NAME + f".{self.name}.MultimodalWebSurfer")
self._chat_history: List[LLMMessage] = []
# Define the download handler
def _download_handler(download: Download) -> None:
self._last_download = download
self._download_handler = _download_handler
# Define the Playwright controller that handles the browser interactions
self._playwright_controller = PlaywrightController(
animate_actions=self.animate_actions,
downloads_folder=self.downloads_folder,
viewport_width=self.VIEWPORT_WIDTH,
viewport_height=self.VIEWPORT_HEIGHT,
_download_handler=self._download_handler,
to_resize_viewport=self.to_resize_viewport,
)
self.default_tools = [
TOOL_VISIT_URL,
TOOL_WEB_SEARCH,
TOOL_HISTORY_BACK,
TOOL_CLICK,
TOOL_TYPE,
TOOL_READ_PAGE_AND_ANSWER,
TOOL_SUMMARIZE_PAGE,
TOOL_SLEEP,
TOOL_HOVER,
]
self.did_lazy_init = False # flag to check if we have initialized the browser
async def _lazy_init(
self,
) -> None:
"""
On the first call, we initialize the browser and the page.
"""
# Check the current event loop policy if on windows.
if sys.platform == "win32":
current_policy = asyncio.get_event_loop_policy()
if hasattr(asyncio, "WindowsProactorEventLoopPolicy") and not isinstance(
current_policy, asyncio.WindowsProactorEventLoopPolicy
):
warnings.warn(
"The current event loop policy is not WindowsProactorEventLoopPolicy. "
"This may cause issues with subprocesses. "
"Try setting the event loop policy to WindowsProactorEventLoopPolicy. "
"For example: `asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())`. "
"See https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.ProactorEventLoop.",
stacklevel=2,
)
self._last_download = None
self._prior_metadata_hash = None
# Create the playwright self
launch_args: Dict[str, Any] = {"headless": self.headless}
if self.browser_channel is not None:
launch_args["channel"] = self.browser_channel
if self._playwright is None:
self._playwright = await async_playwright().start()
# Create the context -- are we launching persistent?
if self._context is None:
if self.browser_data_dir is None:
browser = await self._playwright.chromium.launch(**launch_args)
self._context = await browser.new_context(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0"
)
else:
self._context = await self._playwright.chromium.launch_persistent_context(
self.browser_data_dir, **launch_args
)
# Create the page
self._context.set_default_timeout(60000) # One minute
self._page = await self._context.new_page()
assert self._page is not None
# self._page.route(lambda x: True, self._route_handler)
self._page.on("download", self._download_handler)
if self.to_resize_viewport:
await self._page.set_viewport_size({"width": self.VIEWPORT_WIDTH, "height": self.VIEWPORT_HEIGHT})
await self._page.add_init_script(
path=os.path.join(os.path.abspath(os.path.dirname(__file__)), "page_script.js")
)
await self._page.goto(self.start_page)
await self._page.wait_for_load_state()
# Prepare the debug directory -- which stores the screenshots generated throughout the process
await self._set_debug_dir(self.debug_dir)
self.did_lazy_init = True
async def close(self) -> None:
"""
Close the browser and the page.
Should be called when the agent is no longer needed.
"""
if self._page is not None:
await self._page.close()
self._page = None
if self._context is not None:
await self._context.close()
self._context = None
if self._playwright is not None:
await self._playwright.stop()
self._playwright = None
async def _set_debug_dir(self, debug_dir: str | None) -> None:
assert self._page is not None
if self.debug_dir is None:
return
if not os.path.isdir(self.debug_dir):
os.mkdir(self.debug_dir)
if self.to_save_screenshots:
current_timestamp = "_" + int(time.time()).__str__()
screenshot_png_name = "screenshot" + current_timestamp + ".png"
await self._page.screenshot(path=os.path.join(self.debug_dir, screenshot_png_name)) # type: ignore
self.logger.info(
WebSurferEvent(
source=self.name,
url=self._page.url,
message="Screenshot: " + screenshot_png_name,
)
)
@property
def produced_message_types(self) -> Sequence[type[BaseChatMessage]]:
return (MultiModalMessage,)
async def on_reset(self, cancellation_token: CancellationToken) -> None:
if not self.did_lazy_init:
return
assert self._page is not None
self._chat_history.clear()
reset_prior_metadata, reset_last_download = await self._playwright_controller.visit_page(
self._page, self.start_page
)
if reset_last_download and self._last_download is not None:
self._last_download = None
if reset_prior_metadata and self._prior_metadata_hash is not None:
self._prior_metadata_hash = None
if self.to_save_screenshots:
current_timestamp = "_" + int(time.time()).__str__()
screenshot_png_name = "screenshot" + current_timestamp + ".png"
await self._page.screenshot(path=os.path.join(self.debug_dir, screenshot_png_name)) # type: ignore
self.logger.info(
WebSurferEvent(
source=self.name,
url=self._page.url,
message="Screenshot: " + screenshot_png_name,
)
)
self.logger.info(
WebSurferEvent(
source=self.name,
url=self._page.url,
message="Resetting browser.",
)
)
async def on_messages(self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken) -> Response:
async for message in self.on_messages_stream(messages, cancellation_token):
if isinstance(message, Response):
return message
raise AssertionError("The stream should have returned the final result.")
async def on_messages_stream(
self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken
) -> AsyncGenerator[BaseAgentEvent | BaseChatMessage | Response, None]:
for chat_message in messages:
self._chat_history.append(chat_message.to_model_message())
self.inner_messages: List[BaseAgentEvent | BaseChatMessage] = []
self.model_usage: List[RequestUsage] = []
try:
content = await self._generate_reply(cancellation_token=cancellation_token)
self._chat_history.append(AssistantMessage(content=content_to_str(content), source=self.name))
final_usage = RequestUsage(
prompt_tokens=sum([u.prompt_tokens for u in self.model_usage]),
completion_tokens=sum([u.completion_tokens for u in self.model_usage]),
)
if isinstance(content, str):
yield Response(
chat_message=TextMessage(content=content, source=self.name, models_usage=final_usage),
inner_messages=self.inner_messages,
)
else:
yield Response(
chat_message=MultiModalMessage(content=content, source=self.name, models_usage=final_usage),
inner_messages=self.inner_messages,
)
except BaseException:
content = f"Web surfing error:\n\n{traceback.format_exc()}"
self._chat_history.append(AssistantMessage(content=content, source=self.name))
yield Response(chat_message=TextMessage(content=content, source=self.name))
async def _generate_reply(self, cancellation_token: CancellationToken) -> UserContent:
"""Generates the actual reply. First calls the LLM to figure out which tool to use, then executes the tool."""
# Lazy init, initialize the browser and the page on the first generate reply only
if not self.did_lazy_init:
await self._lazy_init()
assert self._page is not None
# Clone the messages, removing old screenshots
history: List[LLMMessage] = remove_images(self._chat_history)
# Split the history, removing the last message
if len(history):
user_request = history.pop()
else:
user_request = UserMessage(content="Empty request.", source="user")
# Truncate the history for smaller models
if self._model_client.model_info["family"] not in [
ModelFamily.GPT_4O,
ModelFamily.O1,
ModelFamily.O3,
ModelFamily.GPT_4,
ModelFamily.GPT_35,
]:
history = []
# Ask the page for interactive elements, then prepare the state-of-mark screenshot
rects = await self._playwright_controller.get_interactive_rects(self._page)
viewport = await self._playwright_controller.get_visual_viewport(self._page)
screenshot = await self._page.screenshot()
som_screenshot, visible_rects, rects_above, rects_below = add_set_of_mark(screenshot, rects)
if self.to_save_screenshots:
current_timestamp = "_" + int(time.time()).__str__()
screenshot_png_name = "screenshot_som" + current_timestamp + ".png"
som_screenshot.save(os.path.join(self.debug_dir, screenshot_png_name)) # type: ignore
self.logger.info(
WebSurferEvent(
source=self.name,
url=self._page.url,
message="Screenshot: " + screenshot_png_name,
)
)
# What tools are available?
tools = self.default_tools.copy()
# We can scroll up
if viewport["pageTop"] > 5:
tools.append(TOOL_SCROLL_UP)
# Can scroll down
if (viewport["pageTop"] + viewport["height"] + 5) < viewport["scrollHeight"]:
tools.append(TOOL_SCROLL_DOWN)
# Focus hint
focused = await self._playwright_controller.get_focused_rect_id(self._page)
focused_hint = ""
if focused:
name = self._target_name(focused, rects)
if name:
name = f"(and name '{name}') "
else:
name = ""
role = "control"
try:
role = rects[focused]["role"]
except KeyError:
pass
focused_hint = f"\nThe {role} with ID {focused} {name}currently has the input focus.\n\n"
# Everything visible
visible_targets = "\n".join(self._format_target_list(visible_rects, rects)) + "\n\n"
# Everything else
other_targets: List[str] = []
other_targets.extend(self._format_target_list(rects_above, rects))
other_targets.extend(self._format_target_list(rects_below, rects))
if len(other_targets) > 0:
if len(other_targets) > 30:
other_targets = other_targets[0:30]
other_targets.append("...")
other_targets_str = (
"Additional valid interaction targets include (but are not limited to):\n"
+ "\n".join(other_targets)
+ "\n\n"
)
else:
other_targets_str = ""
state_description = "Your " + await self._get_state_description()
tool_names = "\n".join([t["name"] for t in tools])
page_title = await self._page.title()
prompt_message = None
if self._model_client.model_info["vision"]:
text_prompt = WEB_SURFER_TOOL_PROMPT_MM.format(
state_description=state_description,
visible_targets=visible_targets,
other_targets_str=other_targets_str,
focused_hint=focused_hint,
tool_names=tool_names,
title=page_title,
url=self._page.url,
).strip()
# Scale the screenshot for the MLM, and close the original
scaled_screenshot = som_screenshot.resize((self.MLM_WIDTH, self.MLM_HEIGHT))
som_screenshot.close()
if self.to_save_screenshots:
scaled_screenshot.save(os.path.join(self.debug_dir, "screenshot_scaled.png")) # type: ignore
# Create the message
prompt_message = UserMessage(
content=[re.sub(r"(\n\s*){3,}", "\n\n", text_prompt), AGImage.from_pil(scaled_screenshot)],
source=self.name,
)
else:
text_prompt = WEB_SURFER_TOOL_PROMPT_TEXT.format(
state_description=state_description,
visible_targets=visible_targets,
other_targets_str=other_targets_str,
focused_hint=focused_hint,
tool_names=tool_names,
title=page_title,
url=self._page.url,
).strip()
# Create the message
prompt_message = UserMessage(content=re.sub(r"(\n\s*){3,}", "\n\n", text_prompt), source=self.name)
history.append(prompt_message)
history.append(user_request)
# {history[-2].content if isinstance(history[-2].content, str) else history[-2].content[0]}
# print(f"""
# ================={len(history)}=================
# {history[-2].content}
# =====
# {history[-1].content}
# ===================================================
# """)
# Make the request
response = await self._model_client.create(
history, tools=tools, extra_create_args={"tool_choice": "auto"}, cancellation_token=cancellation_token
) # , "parallel_tool_calls": False})
self.model_usage.append(response.usage)
message = response.content
self._last_download = None
if isinstance(message, str):
# Answer directly
self.inner_messages.append(TextMessage(content=message, source=self.name))
return message
elif isinstance(message, list):
# Take an action
return await self._execute_tool(message, rects, tool_names, cancellation_token=cancellation_token)
else:
# Not sure what happened here
raise AssertionError(f"Unknown response format '{message}'")
async def _execute_tool(
self,
message: List[FunctionCall],
rects: Dict[str, InteractiveRegion],
tool_names: str,
cancellation_token: Optional[CancellationToken] = None,
) -> UserContent:
# Execute the tool
name = message[0].name
args = json.loads(message[0].arguments)
action_description = ""
assert self._page is not None
self.logger.info(
WebSurferEvent(
source=self.name,
url=self._page.url,
action=name,
arguments=args,
message=f"{name}( {json.dumps(args)} )",
)
)
self.inner_messages.append(TextMessage(content=f"{name}( {json.dumps(args)} )", source=self.name))
if name == "visit_url":
url = args.get("url")
action_description = f"I typed '{url}' into the browser address bar."
# Check if the argument starts with a known protocol
if url.startswith(("https://", "http://", "file://", "about:")):
reset_prior_metadata, reset_last_download = await self._playwright_controller.visit_page(
self._page, url
)
# If the argument contains a space, treat it as a search query
elif " " in url:
reset_prior_metadata, reset_last_download = await self._playwright_controller.visit_page(
self._page, f"https://www.bing.com/search?q={quote_plus(url)}&FORM=QBLH"
)
# Otherwise, prefix with https://
else:
reset_prior_metadata, reset_last_download = await self._playwright_controller.visit_page(
self._page, "https://" + url
)
if reset_last_download and self._last_download is not None:
self._last_download = None
if reset_prior_metadata and self._prior_metadata_hash is not None:
self._prior_metadata_hash = None
elif name == "history_back":
action_description = "I clicked the browser back button."
await self._playwright_controller.back(self._page)
elif name == "web_search":
query = args.get("query")
action_description = f"I typed '{query}' into the browser search bar."
reset_prior_metadata, reset_last_download = await self._playwright_controller.visit_page(
self._page, f"https://www.bing.com/search?q={quote_plus(query)}&FORM=QBLH"
)
if reset_last_download and self._last_download is not None:
self._last_download = None
if reset_prior_metadata and self._prior_metadata_hash is not None:
self._prior_metadata_hash = None
elif name == "scroll_up":
action_description = "I scrolled up one page in the browser."
await self._playwright_controller.page_up(self._page)
elif name == "scroll_down":
action_description = "I scrolled down one page in the browser."
await self._playwright_controller.page_down(self._page)
elif name == "click":
target_id = str(args.get("target_id"))
target_name = self._target_name(target_id, rects)
if target_name:
action_description = f"I clicked '{target_name}'."
else:
action_description = "I clicked the control."
new_page_tentative = await self._playwright_controller.click_id(self._page, target_id)
if new_page_tentative is not None:
self._page = new_page_tentative
self._prior_metadata_hash = None
self.logger.info(
WebSurferEvent(
source=self.name,
url=self._page.url,
message="New tab or window.",
)
)
elif name == "input_text":
input_field_id = str(args.get("input_field_id"))
text_value = str(args.get("text_value"))
input_field_name = self._target_name(input_field_id, rects)
if input_field_name:
action_description = f"I typed '{text_value}' into '{input_field_name}'."
else:
action_description = f"I input '{text_value}'."
await self._playwright_controller.fill_id(self._page, input_field_id, text_value)
elif name == "scroll_element_up":
target_id = str(args.get("target_id"))
target_name = self._target_name(target_id, rects)
if target_name:
action_description = f"I scrolled '{target_name}' up."
else:
action_description = "I scrolled the control up."
await self._playwright_controller.scroll_id(self._page, target_id, "up")
elif name == "scroll_element_down":
target_id = str(args.get("target_id"))
target_name = self._target_name(target_id, rects)
if target_name:
action_description = f"I scrolled '{target_name}' down."
else:
action_description = "I scrolled the control down."
await self._playwright_controller.scroll_id(self._page, target_id, "down")
elif name == "answer_question":
question = str(args.get("question"))
action_description = f"I answered the following question '{question}' based on the web page."
# Do Q&A on the DOM. No need to take further action. Browser state does not change.
return await self._summarize_page(question=question, cancellation_token=cancellation_token)
elif name == "summarize_page":
# Summarize the DOM. No need to take further action. Browser state does not change.
action_description = "I summarized the current web page"
return await self._summarize_page(cancellation_token=cancellation_token)
elif name == "hover":
target_id = str(args.get("target_id"))
target_name = self._target_name(target_id, rects)
if target_name:
action_description = f"I hovered over '{target_name}'."
else:
action_description = "I hovered over the control."
await self._playwright_controller.hover_id(self._page, target_id)
elif name == "sleep":
action_description = "I am waiting a short period of time before taking further action."
await self._playwright_controller.sleep(self._page, 3)
else:
raise ValueError(f"Unknown tool '{name}'. Please choose from:\n\n{tool_names}")
await self._page.wait_for_load_state()
await self._playwright_controller.sleep(self._page, 3)
# Handle downloads
if self._last_download is not None and self.downloads_folder is not None:
fname = os.path.join(self.downloads_folder, self._last_download.suggested_filename)
await self._last_download.save_as(fname) # type: ignore
page_body = f"<html><head><title>Download Successful</title></head><body style=\"margin: 20px;\"><h1>Successfully downloaded '{self._last_download.suggested_filename}' to local path:<br><br>{fname}</h1></body></html>"
await self._page.goto(
"data:text/html;base64," + base64.b64encode(page_body.encode("utf-8")).decode("utf-8")
)
await self._page.wait_for_load_state()
# Handle metadata
page_metadata = json.dumps(await self._playwright_controller.get_page_metadata(self._page), indent=4)
metadata_hash = hashlib.md5(page_metadata.encode("utf-8")).hexdigest()
if metadata_hash != self._prior_metadata_hash:
page_metadata = (
"\n\nThe following metadata was extracted from the webpage:\n\n" + page_metadata.strip() + "\n"
)
else:
page_metadata = ""
self._prior_metadata_hash = metadata_hash
new_screenshot = await self._page.screenshot()
if self.to_save_screenshots:
current_timestamp = "_" + int(time.time()).__str__()
screenshot_png_name = "screenshot" + current_timestamp + ".png"
async with aiofiles.open(os.path.join(self.debug_dir, screenshot_png_name), "wb") as file: # type: ignore
await file.write(new_screenshot) # type: ignore
self.logger.info(
WebSurferEvent(
source=self.name,
url=self._page.url,
message="Screenshot: " + screenshot_png_name,
)
)
# Return the complete observation
state_description = "The " + await self._get_state_description()
message_content = (
f"{action_description}\n\n" + state_description + page_metadata + "\nHere is a screenshot of the page."
)
return [
re.sub(r"(\n\s*){3,}", "\n\n", message_content), # Removing blank lines
AGImage.from_pil(PIL.Image.open(io.BytesIO(new_screenshot))),
]
async def _get_state_description(self) -> str:
assert self._playwright_controller is not None
assert self._page is not None
# Describe the viewport of the new page in words
viewport = await self._playwright_controller.get_visual_viewport(self._page)
percent_visible = int(viewport["height"] * 100 / viewport["scrollHeight"])
percent_scrolled = int(viewport["pageTop"] * 100 / viewport["scrollHeight"])
if percent_scrolled < 1: # Allow some rounding error
position_text = "at the top of the page"
elif percent_scrolled + percent_visible >= 99: # Allow some rounding error
position_text = "at the bottom of the page"
else:
position_text = str(percent_scrolled) + "% down from the top of the page"
visible_text = await self._playwright_controller.get_visible_text(self._page)
# Return the complete observation
page_title = await self._page.title()
message_content = f"web browser is open to the page [{page_title}]({self._page.url}).\nThe viewport shows {percent_visible}% of the webpage, and is positioned {position_text}\n"
message_content += f"The following text is visible in the viewport:\n\n{visible_text}"
return message_content
def _target_name(self, target: str, rects: Dict[str, InteractiveRegion]) -> str | None:
try:
return rects[target]["aria_name"].strip()
except KeyError:
return None
def _format_target_list(self, ids: List[str], rects: Dict[str, InteractiveRegion]) -> List[str]:
"""
Format the list of targets in the webpage as a string to be used in the agent's prompt.
"""
targets: List[str] = []
for r in list(set(ids)):
if r in rects:
# Get the role
aria_role = rects[r].get("role", "").strip()
if len(aria_role) == 0:
aria_role = rects[r].get("tag_name", "").strip()
# Get the name
aria_name = re.sub(r"[\n\r]+", " ", rects[r].get("aria_name", "")).strip()
# What are the actions?
actions = ['"click", "hover"']
if rects[r]["role"] in ["textbox", "searchbox", "search"]:
actions = ['"input_text"']
actions_str = "[" + ",".join(actions) + "]"
targets.append(f'{{"id": {r}, "name": "{aria_name}", "role": "{aria_role}", "tools": {actions_str} }}')
return targets
async def _summarize_page(
self,
question: str | None = None,
cancellation_token: Optional[CancellationToken] = None,
) -> str:
assert self._page is not None
page_markdown: str = await self._playwright_controller.get_page_markdown(self._page)
title: str = self._page.url
try:
title = await self._page.title()
except Exception:
pass
# Take a screenshot and scale it
screenshot = Image.open(io.BytesIO(await self._page.screenshot()))
scaled_screenshot = screenshot.resize((self.MLM_WIDTH, self.MLM_HEIGHT))
screenshot.close()
ag_image = AGImage.from_pil(scaled_screenshot)
# Prepare the system prompt
messages: List[LLMMessage] = []
messages.append(SystemMessage(content=WEB_SURFER_QA_SYSTEM_MESSAGE))
prompt = WEB_SURFER_QA_PROMPT(title, question)
# Grow the buffer (which is added to the prompt) until we overflow the context window or run out of lines
buffer = ""
# for line in re.split(r"([\r\n]+)", page_markdown):
for line in page_markdown.splitlines():
trial_message = UserMessage(
content=prompt + buffer + line,
source=self.name,
)
try:
remaining = self._model_client.remaining_tokens(messages + [trial_message])
except KeyError:
# Use the default if the model isn't found
remaining = DEFAULT_CONTEXT_SIZE - self._model_client.count_tokens(messages + [trial_message])
if self._model_client.model_info["vision"] and remaining <= 0:
break
if self._model_client.model_info["vision"] and remaining <= self.SCREENSHOT_TOKENS:
break
buffer += line
# Nothing to do
buffer = buffer.strip()
if len(buffer) == 0:
return "Nothing to summarize."
# Append the message
if self._model_client.model_info["vision"]:
# Multimodal
messages.append(
UserMessage(
content=[
prompt + buffer,
ag_image,
],
source=self.name,
)
)
else:
# Text only
messages.append(
UserMessage(
content=prompt + buffer,
source=self.name,
)
)
# Generate the response
response = await self._model_client.create(messages, cancellation_token=cancellation_token)
self.model_usage.append(response.usage)
scaled_screenshot.close()
assert isinstance(response.content, str)
return response.content
def _to_config(self) -> MultimodalWebSurferConfig:
return MultimodalWebSurferConfig(
name=self.name,
model_client=self._model_client.dump_component(),
downloads_folder=self.downloads_folder,
description=self.description,
debug_dir=self.debug_dir,
headless=self.headless,
start_page=self.start_page,
animate_actions=self.animate_actions,
to_save_screenshots=self.to_save_screenshots,
use_ocr=self.use_ocr,
browser_channel=self.browser_channel,
browser_data_dir=self.browser_data_dir,
to_resize_viewport=self.to_resize_viewport,
)
@classmethod
def _from_config(cls, config: MultimodalWebSurferConfig) -> Self:
return cls(
name=config.name,
model_client=ChatCompletionClient.load_component(config.model_client),
downloads_folder=config.downloads_folder,
description=config.description or cls.DEFAULT_DESCRIPTION,
debug_dir=config.debug_dir,
headless=config.headless,
start_page=config.start_page or cls.DEFAULT_START_PAGE,
animate_actions=config.animate_actions,
to_save_screenshots=config.to_save_screenshots,
use_ocr=config.use_ocr,
browser_channel=config.browser_channel,
browser_data_dir=config.browser_data_dir,
to_resize_viewport=config.to_resize_viewport,
)

View File

@@ -0,0 +1,52 @@
WEB_SURFER_TOOL_PROMPT_MM = """
{state_description}
Consider the following screenshot of the page. In this screenshot, interactive elements are outlined in bounding boxes of different colors. Each bounding box has a numeric ID label in the same color. Additional information about each visible label is listed below:
{visible_targets}{other_targets_str}{focused_hint}
You are to respond to my next request by selecting an appropriate tool from the following set, or by answering the question directly if possible:
{tool_names}
When deciding between tools, consider if the request can be best addressed by:
- the contents of the CURRENT VIEWPORT (in which case actions like clicking links, clicking buttons, inputting text, or hovering over an element, might be more appropriate)
- contents found elsewhere on the CURRENT WEBPAGE [{title}]({url}), in which case actions like scrolling, summarization, or full-page Q&A might be most appropriate
- on ANOTHER WEBSITE entirely (in which case actions like performing a new web search might be the best option)
My request follows:
"""
WEB_SURFER_TOOL_PROMPT_TEXT = """
{state_description}
You have also identified the following interactive components:
{visible_targets}{other_targets_str}{focused_hint}
You are to respond to my next request by selecting an appropriate tool from the following set, or by answering the question directly if possible:
{tool_names}
When deciding between tools, consider if the request can be best addressed by:
- the contents of the CURRENT VIEWPORT (in which case actions like clicking links, clicking buttons, inputting text, or hovering over an element, might be more appropriate)
- contents found elsewhere on the CURRENT WEBPAGE [{title}]({url}), in which case actions like scrolling, summarization, or full-page Q&A might be most appropriate
- on ANOTHER WEBSITE entirely (in which case actions like performing a new web search might be the best option)
My request follows:
"""
WEB_SURFER_QA_SYSTEM_MESSAGE = """
You are a helpful assistant that can summarize long documents to answer question.
"""
def WEB_SURFER_QA_PROMPT(title: str, question: str | None = None) -> str:
base_prompt = f"We are visiting the webpage '{title}'. Its full-text content are pasted below, along with a screenshot of the page's current viewport."
if question is not None:
return (
f"{base_prompt} Please summarize the webpage into one or two paragraphs with respect to '{question}':\n\n"
)
else:
return f"{base_prompt} Please summarize the webpage into one or two paragraphs:\n\n"

View File

@@ -0,0 +1,96 @@
import io
import random
from typing import BinaryIO, Dict, List, Tuple, cast
from PIL import Image, ImageDraw, ImageFont
from ._types import DOMRectangle, InteractiveRegion
TOP_NO_LABEL_ZONE = 20 # Don't print any labels close the top of the page
def add_set_of_mark(
screenshot: bytes | Image.Image | io.BufferedIOBase, ROIs: Dict[str, InteractiveRegion]
) -> Tuple[Image.Image, List[str], List[str], List[str]]:
if isinstance(screenshot, Image.Image):
return _add_set_of_mark(screenshot, ROIs)
if isinstance(screenshot, bytes):
screenshot = io.BytesIO(screenshot)
# TODO: Not sure why this cast was needed, but by this point screenshot is a binary file-like object
image = Image.open(cast(BinaryIO, screenshot))
comp, visible_rects, rects_above, rects_below = _add_set_of_mark(image, ROIs)
image.close()
return comp, visible_rects, rects_above, rects_below
def _add_set_of_mark(
screenshot: Image.Image, ROIs: Dict[str, InteractiveRegion]
) -> Tuple[Image.Image, List[str], List[str], List[str]]:
visible_rects: List[str] = list()
rects_above: List[str] = list() # Scroll up to see
rects_below: List[str] = list() # Scroll down to see
fnt = ImageFont.load_default(14)
base = screenshot.convert("L").convert("RGBA")
overlay = Image.new("RGBA", base.size)
draw = ImageDraw.Draw(overlay)
for r in ROIs:
for rect in ROIs[r]["rects"]:
# Empty rectangles
if not rect:
continue
if rect["width"] * rect["height"] == 0:
continue
mid = ((rect["right"] + rect["left"]) / 2.0, (rect["top"] + rect["bottom"]) / 2.0)
if 0 <= mid[0] and mid[0] < base.size[0]:
if mid[1] < 0:
rects_above.append(r)
elif mid[1] >= base.size[1]:
rects_below.append(r)
else:
visible_rects.append(r)
_draw_roi(draw, int(r), fnt, rect)
comp = Image.alpha_composite(base, overlay)
overlay.close()
return comp, visible_rects, rects_above, rects_below
def _draw_roi(
draw: ImageDraw.ImageDraw, idx: int, font: ImageFont.FreeTypeFont | ImageFont.ImageFont, rect: DOMRectangle
) -> None:
color = _color(idx)
luminance = color[0] * 0.3 + color[1] * 0.59 + color[2] * 0.11
text_color = (0, 0, 0, 255) if luminance > 90 else (255, 255, 255, 255)
roi = ((rect["left"], rect["top"]), (rect["right"], rect["bottom"]))
label_location = (rect["right"], rect["top"])
label_anchor = "rb"
if label_location[1] <= TOP_NO_LABEL_ZONE:
label_location = (rect["right"], rect["bottom"])
label_anchor = "rt"
draw.rectangle(roi, outline=color, fill=(color[0], color[1], color[2], 48), width=2)
# TODO: Having trouble with these types being partially Unknown.
bbox = draw.textbbox(label_location, str(idx), font=font, anchor=label_anchor, align="center") # type: ignore
bbox = (bbox[0] - 3, bbox[1] - 3, bbox[2] + 3, bbox[3] + 3)
draw.rectangle(bbox, fill=color)
# TODO: Having trouble with these types being partially Unknown.
draw.text(label_location, str(idx), fill=text_color, font=font, anchor=label_anchor, align="center") # type: ignore
def _color(identifier: int) -> Tuple[int, int, int, int]:
rnd = random.Random(int(identifier))
color = [rnd.randint(0, 255), rnd.randint(125, 255), rnd.randint(0, 50)]
rnd.shuffle(color)
color.append(255)
return cast(Tuple[int, int, int, int], tuple(color))

View File

@@ -0,0 +1,317 @@
from typing import Any, Dict
from agentdhal_core.tools._base import ParametersSchema, ToolSchema
def _load_tool(tooldef: Dict[str, Any]) -> ToolSchema:
return ToolSchema(
name=tooldef["function"]["name"],
description=tooldef["function"]["description"],
parameters=ParametersSchema(
type="object",
properties=tooldef["function"]["parameters"]["properties"],
required=tooldef["function"]["parameters"]["required"],
),
)
REASONING_TOOL_PROMPT = (
"A short description of the action to be performed and reason for doing so, do not mention the user."
)
TOOL_VISIT_URL: ToolSchema = _load_tool(
{
"type": "function",
"function": {
"name": "visit_url",
"description": "Navigate directly to a provided URL using the browser's address bar. Prefer this tool over other navigation techniques in cases where the user provides a fully-qualified URL (e.g., choose it over clicking links, or inputing queries into search boxes).",
"parameters": {
"type": "object",
"properties": {
"reasoning": {
"type": "string",
"description": REASONING_TOOL_PROMPT,
},
"url": {
"type": "string",
"description": "The URL to visit in the browser.",
},
},
"required": ["reasoning", "url"],
},
},
}
)
TOOL_WEB_SEARCH: ToolSchema = _load_tool(
{
"type": "function",
"function": {
"name": "web_search",
"description": "Performs a web search on Bing.com with the given query.",
"parameters": {
"type": "object",
"properties": {
"reasoning": {
"type": "string",
"description": REASONING_TOOL_PROMPT,
},
"query": {
"type": "string",
"description": "The web search query to use.",
},
},
"required": ["reasoning", "query"],
},
},
}
)
TOOL_HISTORY_BACK: ToolSchema = _load_tool(
{
"type": "function",
"function": {
"name": "history_back",
"description": "Navigates back one page in the browser's history. This is equivalent to clicking the browser back button.",
"parameters": {
"type": "object",
"properties": {
"reasoning": {
"type": "string",
"description": REASONING_TOOL_PROMPT,
},
},
"required": ["reasoning"],
},
},
}
)
TOOL_SCROLL_UP: ToolSchema = _load_tool(
{
"type": "function",
"function": {
"name": "scroll_up",
"description": "Scrolls the entire browser viewport one page UP towards the beginning.",
"parameters": {
"type": "object",
"properties": {
"reasoning": {
"type": "string",
"description": REASONING_TOOL_PROMPT,
},
},
"required": ["reasoning"],
},
},
}
)
TOOL_SCROLL_DOWN: ToolSchema = _load_tool(
{
"type": "function",
"function": {
"name": "scroll_down",
"description": "Scrolls the entire browser viewport one page DOWN towards the end.",
"parameters": {
"type": "object",
"properties": {
"reasoning": {
"type": "string",
"description": REASONING_TOOL_PROMPT,
},
},
"required": ["reasoning"],
},
},
}
)
TOOL_CLICK: ToolSchema = _load_tool(
{
"type": "function",
"function": {
"name": "click",
"description": "Clicks the mouse on the target with the given id.",
"parameters": {
"type": "object",
"properties": {
"reasoning": {
"type": "string",
"description": REASONING_TOOL_PROMPT,
},
"target_id": {
"type": "integer",
"description": "The numeric id of the target to click.",
},
},
"required": ["reasoning", "target_id"],
},
},
}
)
TOOL_TYPE: ToolSchema = _load_tool(
{
"type": "function",
"function": {
"name": "input_text",
"description": "Types the given text value into the specified field.",
"parameters": {
"type": "object",
"properties": {
"reasoning": {
"type": "string",
"description": REASONING_TOOL_PROMPT,
},
"input_field_id": {
"type": "integer",
"description": "The numeric id of the input field to receive the text.",
},
"text_value": {
"type": "string",
"description": "The text to type into the input field.",
},
},
"required": ["reasoning", "input_field_id", "text_value"],
},
},
}
)
TOOL_SCROLL_ELEMENT_DOWN: ToolSchema = _load_tool(
{
"type": "function",
"function": {
"name": "scroll_element_down",
"description": "Scrolls a given html element (e.g., a div or a menu) DOWN.",
"parameters": {
"type": "object",
"properties": {
"reasoning": {
"type": "string",
"description": REASONING_TOOL_PROMPT,
},
"target_id": {
"type": "integer",
"description": "The numeric id of the target to scroll down.",
},
},
"required": ["reasoning", "target_id"],
},
},
}
)
TOOL_SCROLL_ELEMENT_UP: ToolSchema = _load_tool(
{
"type": "function",
"function": {
"name": "scroll_element_up",
"description": "Scrolls a given html element (e.g., a div or a menu) UP.",
"parameters": {
"type": "object",
"properties": {
"reasoning": {
"type": "string",
"description": REASONING_TOOL_PROMPT,
},
"target_id": {
"type": "integer",
"description": "The numeric id of the target to scroll UP.",
},
},
"required": ["reasoning", "target_id"],
},
},
}
)
TOOL_HOVER: ToolSchema = _load_tool(
{
"type": "function",
"function": {
"name": "hover",
"description": "Hovers the mouse over the target with the given id.",
"parameters": {
"type": "object",
"properties": {
"reasoning": {
"type": "string",
"description": REASONING_TOOL_PROMPT,
},
"target_id": {
"type": "integer",
"description": "The numeric id of the target to hover over.",
},
},
"required": ["reasoning", "target_id"],
},
},
}
)
TOOL_READ_PAGE_AND_ANSWER: ToolSchema = _load_tool(
{
"type": "function",
"function": {
"name": "answer_question",
"description": "Uses AI to answer a question about the current webpage's content.",
"parameters": {
"type": "object",
"properties": {
"reasoning": {
"type": "string",
"description": REASONING_TOOL_PROMPT,
},
"question": {
"type": "string",
"description": "The question to answer.",
},
},
"required": ["reasoning", "question"],
},
},
}
)
TOOL_SUMMARIZE_PAGE: ToolSchema = _load_tool(
{
"type": "function",
"function": {
"name": "summarize_page",
"description": "Uses AI to summarize the entire page.",
"parameters": {
"type": "object",
"properties": {
"reasoning": {
"type": "string",
"description": REASONING_TOOL_PROMPT,
},
},
"required": ["reasoning"],
},
},
}
)
TOOL_SLEEP: ToolSchema = _load_tool(
{
"type": "function",
"function": {
"name": "sleep",
"description": "Wait a short period of time. Call this function if the page has not yet fully loaded, or if it is determined that a small delay would increase the task's chances of success.",
"parameters": {
"type": "object",
"properties": {
"reasoning": {
"type": "string",
"description": REASONING_TOOL_PROMPT,
},
},
"required": ["reasoning"],
},
},
}
)

View File

@@ -0,0 +1,106 @@
from typing import Any, Dict, List, TypedDict, Union
from agentdhal_core import FunctionCall, Image
from agentdhal_core.models import FunctionExecutionResult
UserContent = Union[str, List[Union[str, Image]]]
AssistantContent = Union[str, List[FunctionCall]]
FunctionExecutionContent = List[FunctionExecutionResult]
SystemContent = str
class DOMRectangle(TypedDict):
x: Union[int, float]
y: Union[int, float]
width: Union[int, float]
height: Union[int, float]
top: Union[int, float]
right: Union[int, float]
bottom: Union[int, float]
left: Union[int, float]
class VisualViewport(TypedDict):
height: Union[int, float]
width: Union[int, float]
offsetLeft: Union[int, float]
offsetTop: Union[int, float]
pageLeft: Union[int, float]
pageTop: Union[int, float]
scale: Union[int, float]
clientWidth: Union[int, float]
clientHeight: Union[int, float]
scrollWidth: Union[int, float]
scrollHeight: Union[int, float]
class InteractiveRegion(TypedDict):
tag_name: str
role: str
aria_name: str
v_scrollable: bool
rects: List[DOMRectangle]
# Helper functions for dealing with JSON. Not sure there's a better way?
def _get_str(d: Any, k: str) -> str:
val = d[k]
assert isinstance(val, str)
return val
def _get_number(d: Any, k: str) -> Union[int, float]:
val = d[k]
assert isinstance(val, int) or isinstance(val, float)
return val
def _get_bool(d: Any, k: str) -> bool:
val = d[k]
assert isinstance(val, bool)
return val
def domrectangle_from_dict(rect: Dict[str, Any]) -> DOMRectangle:
return DOMRectangle(
x=_get_number(rect, "x"),
y=_get_number(rect, "y"),
width=_get_number(rect, "width"),
height=_get_number(rect, "height"),
top=_get_number(rect, "top"),
right=_get_number(rect, "right"),
bottom=_get_number(rect, "bottom"),
left=_get_number(rect, "left"),
)
def interactiveregion_from_dict(region: Dict[str, Any]) -> InteractiveRegion:
typed_rects: List[DOMRectangle] = []
for rect in region["rects"]:
typed_rects.append(domrectangle_from_dict(rect))
return InteractiveRegion(
tag_name=_get_str(region, "tag_name"),
role=_get_str(region, "role"),
aria_name=_get_str(region, "aria-name"),
v_scrollable=_get_bool(region, "v-scrollable"),
rects=typed_rects,
)
def visualviewport_from_dict(viewport: Dict[str, Any]) -> VisualViewport:
return VisualViewport(
height=_get_number(viewport, "height"),
width=_get_number(viewport, "width"),
offsetLeft=_get_number(viewport, "offsetLeft"),
offsetTop=_get_number(viewport, "offsetTop"),
pageLeft=_get_number(viewport, "pageLeft"),
pageTop=_get_number(viewport, "pageTop"),
scale=_get_number(viewport, "scale"),
clientWidth=_get_number(viewport, "clientWidth"),
clientHeight=_get_number(viewport, "clientHeight"),
scrollWidth=_get_number(viewport, "scrollWidth"),
scrollHeight=_get_number(viewport, "scrollHeight"),
)

View File

@@ -0,0 +1,429 @@
var MultimodalWebSurfer = MultimodalWebSurfer || (function() {
let nextLabel = 10;
let roleMapping = {
"a": "link",
"area": "link",
"button": "button",
"input, type=button": "button",
"input, type=checkbox": "checkbox",
"input, type=email": "textbox",
"input, type=number": "spinbutton",
"input, type=radio": "radio",
"input, type=range": "slider",
"input, type=reset": "button",
"input, type=search": "searchbox",
"input, type=submit": "button",
"input, type=tel": "textbox",
"input, type=text": "textbox",
"input, type=url": "textbox",
"search": "search",
"select": "combobox",
"option": "option",
"textarea": "textbox"
};
let getCursor = function(elm) {
return window.getComputedStyle(elm)["cursor"];
};
let getInteractiveElements = function() {
let results = []
let roles = ["scrollbar", "searchbox", "slider", "spinbutton", "switch", "tab", "treeitem", "button", "checkbox", "gridcell", "link", "menuitem", "menuitemcheckbox", "menuitemradio", "option", "progressbar", "radio", "textbox", "combobox", "menu", "tree", "treegrid", "grid", "listbox", "radiogroup", "widget"];
let inertCursors = ["auto", "default", "none", "text", "vertical-text", "not-allowed", "no-drop"];
// Get the main interactive elements
let nodeList = document.querySelectorAll("input, select, textarea, button, [href], [onclick], [contenteditable], [tabindex]:not([tabindex='-1'])");
for (let i=0; i<nodeList.length; i++) { // Copy to something mutable
results.push(nodeList[i]);
}
// Anything not already included that has a suitable role
nodeList = document.querySelectorAll("[role]");
for (let i=0; i<nodeList.length; i++) { // Copy to something mutable
if (results.indexOf(nodeList[i]) == -1) {
let role = nodeList[i].getAttribute("role");
if (roles.indexOf(role) > -1) {
results.push(nodeList[i]);
}
}
}
// Any element that changes the cursor to something implying interactivity
nodeList = document.querySelectorAll("*");
for (let i=0; i<nodeList.length; i++) {
let node = nodeList[i];
// Cursor is default, or does not suggest interactivity
let cursor = getCursor(node);
if (inertCursors.indexOf(cursor) >= 0) {
continue;
}
// Move up to the first instance of this cursor change
parent = node.parentNode;
while (parent && getCursor(parent) == cursor) {
node = parent;
parent = node.parentNode;
}
// Add the node if it is new
if (results.indexOf(node) == -1) {
results.push(node);
}
}
return results;
};
let labelElements = function(elements) {
for (let i=0; i<elements.length; i++) {
if (!elements[i].hasAttribute("__elementId")) {
elements[i].setAttribute("__elementId", "" + (nextLabel++));
}
}
};
let isTopmost = function(element, x, y) {
let hit = document.elementFromPoint(x, y);
// Hack to handle elements outside the viewport
if (hit === null) {
return true;
}
while (hit) {
if (hit == element) return true;
hit = hit.parentNode;
}
return false;
};
let getFocusedElementId = function() {
let elm = document.activeElement;
while (elm) {
if (elm.hasAttribute && elm.hasAttribute("__elementId")) {
return elm.getAttribute("__elementId");
}
elm = elm.parentNode;
}
return null;
};
let trimmedInnerText = function(element) {
if (!element) {
return "";
}
let text = element.innerText;
if (!text) {
return "";
}
return text.trim();
};
let getApproximateAriaName = function(element) {
// Check for aria labels
if (element.hasAttribute("aria-labelledby")) {
let buffer = "";
let ids = element.getAttribute("aria-labelledby").split(" ");
for (let i=0; i<ids.length; i++) {
let label = document.getElementById(ids[i]);
if (label) {
buffer = buffer + " " + trimmedInnerText(label);
}
}
return buffer.trim();
}
if (element.hasAttribute("aria-label")) {
return element.getAttribute("aria-label");
}
// Check for labels
if (element.hasAttribute("id")) {
let label_id = element.getAttribute("id");
let label = "";
let labels = document.querySelectorAll("label[for='" + label_id + "']");
for (let j=0; j<labels.length; j++) {
label += labels[j].innerText + " ";
}
label = label.trim();
if (label != "") {
return label;
}
}
if (element.parentElement && element.parentElement.tagName == "LABEL") {
return element.parentElement.innerText;
}
// Check for alt text or titles
if (element.hasAttribute("alt")) {
return element.getAttribute("alt")
}
if (element.hasAttribute("title")) {
return element.getAttribute("title")
}
return trimmedInnerText(element);
};
let getApproximateAriaRole = function(element) {
let tag = element.tagName.toLowerCase();
if (tag == "input" && element.hasAttribute("type")) {
tag = tag + ", type=" + element.getAttribute("type");
}
if (element.hasAttribute("role")) {
return [element.getAttribute("role"), tag];
}
else if (tag in roleMapping) {
return [roleMapping[tag], tag];
}
else {
return ["", tag];
}
};
let getInteractiveRects = function() {
labelElements(getInteractiveElements());
let elements = document.querySelectorAll("[__elementId]");
let results = {};
for (let i=0; i<elements.length; i++) {
let key = elements[i].getAttribute("__elementId");
let rects = elements[i].getClientRects();
let ariaRole = getApproximateAriaRole(elements[i]);
let ariaName = getApproximateAriaName(elements[i]);
let vScrollable = elements[i].scrollHeight - elements[i].clientHeight >= 1;
let record = {
"tag_name": ariaRole[1],
"role": ariaRole[0],
"aria-name": ariaName,
"v-scrollable": vScrollable,
"rects": []
};
for (const rect of rects) {
let x = rect.left + rect.width/2;
let y = rect.top + rect.height/2;
if (isTopmost(elements[i], x, y)) {
record["rects"].push(JSON.parse(JSON.stringify(rect)));
}
}
if (record["rects"].length > 0) {
results[key] = record;
}
}
return results;
};
let getVisualViewport = function() {
let vv = window.visualViewport;
let de = document.documentElement;
return {
"height": vv ? vv.height : 0,
"width": vv ? vv.width : 0,
"offsetLeft": vv ? vv.offsetLeft : 0,
"offsetTop": vv ? vv.offsetTop : 0,
"pageLeft": vv ? vv.pageLeft : 0,
"pageTop": vv ? vv.pageTop : 0,
"scale": vv ? vv.scale : 0,
"clientWidth": de ? de.clientWidth : 0,
"clientHeight": de ? de.clientHeight : 0,
"scrollWidth": de ? de.scrollWidth : 0,
"scrollHeight": de ? de.scrollHeight : 0
};
};
let _getMetaTags = function() {
let meta = document.querySelectorAll("meta");
let results = {};
for (let i = 0; i<meta.length; i++) {
let key = null;
if (meta[i].hasAttribute("name")) {
key = meta[i].getAttribute("name");
}
else if (meta[i].hasAttribute("property")) {
key = meta[i].getAttribute("property");
}
else {
continue;
}
if (meta[i].hasAttribute("content")) {
results[key] = meta[i].getAttribute("content");
}
}
return results;
};
let _getJsonLd = function() {
let jsonld = [];
let scripts = document.querySelectorAll('script[type="application/ld+json"]');
for (let i=0; i<scripts.length; i++) {
jsonld.push(scripts[i].innerHTML.trim());
}
return jsonld;
};
// From: https://www.stevefenton.co.uk/blog/2022/12/parse-microdata-with-javascript/
let _getMicrodata = function() {
function sanitize(input) {
return input.replace(/\s/gi, ' ').trim();
}
function addValue(information, name, value) {
if (information[name]) {
if (typeof information[name] === 'array') {
information[name].push(value);
} else {
const arr = [];
arr.push(information[name]);
arr.push(value);
information[name] = arr;
}
} else {
information[name] = value;
}
}
function traverseItem(item, information) {
const children = item.children;
for (let i = 0; i < children.length; i++) {
const child = children[i];
if (child.hasAttribute('itemscope')) {
if (child.hasAttribute('itemprop')) {
const itemProp = child.getAttribute('itemprop');
const itemType = child.getAttribute('itemtype');
const childInfo = {
itemType: itemType
};
traverseItem(child, childInfo);
itemProp.split(' ').forEach(propName => {
addValue(information, propName, childInfo);
});
}
} else if (child.hasAttribute('itemprop')) {
const itemProp = child.getAttribute('itemprop');
itemProp.split(' ').forEach(propName => {
if (propName === 'url') {
addValue(information, propName, child.href);
} else {
addValue(information, propName, sanitize(child.getAttribute("content") || child.content || child.textContent || child.src || ""));
}
});
traverseItem(child, information);
} else {
traverseItem(child, information);
}
}
}
const microdata = [];
document.querySelectorAll("[itemscope]").forEach(function(elem, i) {
const itemType = elem.getAttribute('itemtype');
const information = {
itemType: itemType
};
traverseItem(elem, information);
microdata.push(information);
});
return microdata;
};
let getPageMetadata = function() {
let jsonld = _getJsonLd();
let metaTags = _getMetaTags();
let microdata = _getMicrodata();
let results = {}
if (jsonld.length > 0) {
try {
results["jsonld"] = JSON.parse(jsonld);
}
catch (e) {
results["jsonld"] = jsonld;
}
}
if (microdata.length > 0) {
results["microdata"] = microdata;
}
for (let key in metaTags) {
if (metaTags.hasOwnProperty(key)) {
results["meta_tags"] = metaTags;
break;
}
}
return results;
};
let getVisibleText = function() {
// Get the windows current viewport boundaries
const viewportHeight = window.innerHeight || document.documentElement.clientHeight;
const viewportWidth = window.innerWidth || document.documentElement.clientWidth;
let textInView = "";
const walker = document.createTreeWalker(
document.body,
NodeFilter.SHOW_TEXT,
null,
false
);
while (walker.nextNode()) {
const textNode = walker.currentNode;
// Create a range to retrieve bounding rectangles of the current text node
const range = document.createRange();
range.selectNodeContents(textNode);
const rects = range.getClientRects();
// Check if any rect is inside (or partially inside) the viewport
for (const rect of rects) {
const isVisible =
rect.width > 0 &&
rect.height > 0 &&
rect.bottom >= 0 &&
rect.right >= 0 &&
rect.top <= viewportHeight &&
rect.left <= viewportWidth;
if (isVisible) {
textInView += textNode.nodeValue.replace(/\s+/g, " ");
// Is the parent a block element?
if (textNode.parentNode) {
const parent = textNode.parentNode;
const style = window.getComputedStyle(parent);
if (["inline", "hidden", "none"].indexOf(style.display) === -1) {
textInView += "\n";
}
}
break; // No need to check other rects once found visible
}
}
}
// Remove blank lines from textInView
textInView = textInView.replace(/^\s*\n/gm, "").trim().replace(/\n+/g, "\n");
return textInView;
};
return {
getInteractiveRects: getInteractiveRects,
getVisualViewport: getVisualViewport,
getFocusedElementId: getFocusedElementId,
getPageMetadata: getPageMetadata,
getVisibleText: getVisibleText,
};
})();

View File

@@ -0,0 +1,578 @@
import asyncio
import base64
import io
import os
import random
import warnings
from types import ModuleType
from typing import Any, Callable, Dict, Optional, Tuple, Union, cast
from playwright._impl._errors import Error as PlaywrightError
from playwright._impl._errors import TimeoutError
from playwright.async_api import Download, Page
from ._types import (
InteractiveRegion,
VisualViewport,
interactiveregion_from_dict,
visualviewport_from_dict,
)
markitdown: ModuleType | None = None
try:
# Suppress warnings from markitdown -- which is pretty chatty
warnings.filterwarnings(action="ignore", module="markitdown")
import markitdown
except ImportError:
pass
class PlaywrightController:
"""
A helper class to allow Playwright to interact with web pages to perform actions such as clicking, filling, and scrolling.
Args:
downloads_folder (str | None): The folder to save downloads to. If None, downloads are not saved.
animate_actions (bool): Whether to animate the actions (create fake cursor to click).
viewport_width (int): The width of the viewport.
viewport_height (int): The height of the viewport.
_download_handler (Optional[Callable[[Download], None]]): A function to handle downloads.
to_resize_viewport (bool): Whether to resize the viewport
"""
def __init__(
self,
downloads_folder: str | None = None,
animate_actions: bool = False,
viewport_width: int = 1440,
viewport_height: int = 900,
_download_handler: Optional[Callable[[Download], None]] = None,
to_resize_viewport: bool = True,
) -> None:
"""
Initialize the PlaywrightController.
"""
assert isinstance(animate_actions, bool)
assert isinstance(viewport_width, int)
assert isinstance(viewport_height, int)
assert viewport_height > 0
assert viewport_width > 0
self.animate_actions = animate_actions
self.downloads_folder = downloads_folder
self.viewport_width = viewport_width
self.viewport_height = viewport_height
self._download_handler = _download_handler
self.to_resize_viewport = to_resize_viewport
self._page_script: str = ""
self.last_cursor_position: Tuple[float, float] = (0.0, 0.0)
self._markdown_converter: Optional[Any] | None = None
# Read page_script
with open(
os.path.join(os.path.abspath(os.path.dirname(__file__)), "page_script.js"), "rt", encoding="utf-8"
) as fh:
self._page_script = fh.read()
async def sleep(self, page: Page, duration: Union[int, float]) -> None:
"""
Pause the execution for a specified duration.
Args:
page (Page): The Playwright page object.
duration (Union[int, float]): The duration to sleep in milliseconds.
"""
assert page is not None
await page.wait_for_timeout(duration * 1000)
async def get_interactive_rects(self, page: Page) -> Dict[str, InteractiveRegion]:
"""
Retrieve interactive regions from the web page.
Args:
page (Page): The Playwright page object.
Returns:
Dict[str, InteractiveRegion]: A dictionary of interactive regions.
"""
assert page is not None
# Read the regions from the DOM
try:
await page.evaluate(self._page_script)
except Exception:
pass
result = cast(Dict[str, Dict[str, Any]], await page.evaluate("MultimodalWebSurfer.getInteractiveRects();"))
# Convert the results into appropriate types
assert isinstance(result, dict)
typed_results: Dict[str, InteractiveRegion] = {}
for k in result:
assert isinstance(k, str)
typed_results[k] = interactiveregion_from_dict(result[k])
return typed_results
async def get_visual_viewport(self, page: Page) -> VisualViewport:
"""
Retrieve the visual viewport of the web page.
Args:
page (Page): The Playwright page object.
Returns:
VisualViewport: The visual viewport of the page.
"""
assert page is not None
try:
await page.evaluate(self._page_script)
except Exception:
pass
return visualviewport_from_dict(await page.evaluate("MultimodalWebSurfer.getVisualViewport();"))
async def get_focused_rect_id(self, page: Page) -> str | None:
"""
Retrieve the ID of the currently focused element.
Args:
page (Page): The Playwright page object.
Returns:
str: The ID of the focused element or None if no control has focus.
"""
assert page is not None
try:
await page.evaluate(self._page_script)
except Exception:
pass
result = await page.evaluate("MultimodalWebSurfer.getFocusedElementId();")
return None if result is None else str(result)
async def get_page_metadata(self, page: Page) -> Dict[str, Any]:
"""
Retrieve metadata from the web page.
Args:
page (Page): The Playwright page object.
Returns:
Dict[str, Any]: A dictionary of page metadata.
"""
assert page is not None
try:
await page.evaluate(self._page_script)
except Exception:
pass
result = await page.evaluate("MultimodalWebSurfer.getPageMetadata();")
assert isinstance(result, dict)
return cast(Dict[str, Any], result)
async def on_new_page(self, page: Page) -> None:
"""
Handle actions to perform on a new page.
Args:
page (Page): The Playwright page object.
"""
assert page is not None
page.on("download", self._download_handler) # type: ignore
if self.to_resize_viewport and self.viewport_width and self.viewport_height:
await page.set_viewport_size({"width": self.viewport_width, "height": self.viewport_height})
await self.sleep(page, 0.2)
await page.add_init_script(path=os.path.join(os.path.abspath(os.path.dirname(__file__)), "page_script.js"))
await page.wait_for_load_state()
async def back(self, page: Page) -> None:
"""
Navigate back to the previous page.
Args:
page (Page): The Playwright page object.
"""
assert page is not None
await page.go_back()
async def visit_page(self, page: Page, url: str) -> Tuple[bool, bool]:
"""
Visit a specified URL.
Args:
page (Page): The Playwright page object.
url (str): The URL to visit.
Returns:
Tuple[bool, bool]: A tuple indicating whether to reset prior metadata hash and last download.
"""
assert page is not None
reset_prior_metadata_hash = False
reset_last_download = False
try:
# Regular webpage
await page.goto(url)
await page.wait_for_load_state()
reset_prior_metadata_hash = True
except Exception as e_outer:
# Downloaded file
if self.downloads_folder and "net::ERR_ABORTED" in str(e_outer):
async with page.expect_download() as download_info:
try:
await page.goto(url)
except Exception as e_inner:
if "net::ERR_ABORTED" in str(e_inner):
pass
else:
raise e_inner
download = await download_info.value
fname = os.path.join(self.downloads_folder, download.suggested_filename)
await download.save_as(fname)
message = f"<body style=\"margin: 20px;\"><h1>Successfully downloaded '{download.suggested_filename}' to local path:<br><br>{fname}</h1></body>"
await page.goto(
"data:text/html;base64," + base64.b64encode(message.encode("utf-8")).decode("utf-8")
)
reset_last_download = True
else:
raise e_outer
return reset_prior_metadata_hash, reset_last_download
async def page_down(self, page: Page) -> None:
"""
Scroll the page down by one viewport height minus 50 pixels.
Args:
page (Page): The Playwright page object.
"""
assert page is not None
await page.evaluate(f"window.scrollBy(0, {self.viewport_height-50});")
async def page_up(self, page: Page) -> None:
"""
Scroll the page up by one viewport height minus 50 pixels.
Args:
page (Page): The Playwright page object.
"""
assert page is not None
await page.evaluate(f"window.scrollBy(0, -{self.viewport_height-50});")
async def gradual_cursor_animation(
self, page: Page, start_x: float, start_y: float, end_x: float, end_y: float
) -> None:
"""
Animate the cursor movement gradually from start to end coordinates.
Args:
page (Page): The Playwright page object.
start_x (float): The starting x-coordinate.
start_y (float): The starting y-coordinate.
end_x (float): The ending x-coordinate.
end_y (float): The ending y-coordinate.
"""
# animation helper
steps = 20
for step in range(steps):
x = start_x + (end_x - start_x) * (step / steps)
y = start_y + (end_y - start_y) * (step / steps)
# await page.mouse.move(x, y, steps=1)
await page.evaluate(f"""
(function() {{
let cursor = document.getElementById('red-cursor');
cursor.style.left = '{x}px';
cursor.style.top = '{y}px';
}})();
""")
await asyncio.sleep(0.05)
self.last_cursor_position = (end_x, end_y)
async def add_cursor_box(self, page: Page, identifier: str) -> None:
"""
Add a red cursor box around the element with the given identifier.
Args:
page (Page): The Playwright page object.
identifier (str): The element identifier.
"""
# animation helper
await page.evaluate(f"""
(function() {{
let elm = document.querySelector("[__elementId='{identifier}']");
if (elm) {{
elm.style.transition = 'border 0.3s ease-in-out';
elm.style.border = '2px solid red';
}}
}})();
""")
await asyncio.sleep(0.3)
# Create a red cursor
await page.evaluate("""
(function() {
let cursor = document.createElement('div');
cursor.id = 'red-cursor';
cursor.style.width = '10px';
cursor.style.height = '10px';
cursor.style.backgroundColor = 'red';
cursor.style.position = 'absolute';
cursor.style.borderRadius = '50%';
cursor.style.zIndex = '10000';
document.body.appendChild(cursor);
})();
""")
async def remove_cursor_box(self, page: Page, identifier: str) -> None:
"""
Remove the red cursor box around the element with the given identifier.
Args:
page (Page): The Playwright page object.
identifier (str): The element identifier.
"""
# Remove the highlight and cursor
await page.evaluate(f"""
(function() {{
let elm = document.querySelector("[__elementId='{identifier}']");
if (elm) {{
elm.style.border = '';
}}
let cursor = document.getElementById('red-cursor');
if (cursor) {{
cursor.remove();
}}
}})();
""")
async def click_id(self, page: Page, identifier: str) -> Page | None:
"""
Click the element with the given identifier.
Args:
page (Page): The Playwright page object.
identifier (str): The element identifier.
Returns:
Page | None: The new page if a new page is opened, otherwise None.
"""
new_page: Page | None = None
assert page is not None
target = page.locator(f"[__elementId='{identifier}']")
# See if it exists
try:
await target.wait_for(timeout=5000)
except TimeoutError:
raise ValueError("No such element.") from None
# Click it
await target.scroll_into_view_if_needed()
await asyncio.sleep(0.3)
box = cast(Dict[str, Union[int, float]], await target.bounding_box())
if self.animate_actions:
await self.add_cursor_box(page, identifier)
# Move cursor to the box slowly
start_x, start_y = self.last_cursor_position
end_x, end_y = box["x"] + box["width"] / 2, box["y"] + box["height"] / 2
await self.gradual_cursor_animation(page, start_x, start_y, end_x, end_y)
await asyncio.sleep(0.1)
try:
# Give it a chance to open a new page
async with page.expect_event("popup", timeout=1000) as page_info: # type: ignore
await page.mouse.click(end_x, end_y, delay=10)
new_page = await page_info.value # type: ignore
assert isinstance(new_page, Page)
await self.on_new_page(new_page)
except TimeoutError:
pass
await self.remove_cursor_box(page, identifier)
else:
try:
# Give it a chance to open a new page
async with page.expect_event("popup", timeout=1000) as page_info: # type: ignore
await page.mouse.click(box["x"] + box["width"] / 2, box["y"] + box["height"] / 2, delay=10)
new_page = await page_info.value # type: ignore
assert isinstance(new_page, Page)
await self.on_new_page(new_page)
except TimeoutError:
pass
return new_page # type: ignore
async def hover_id(self, page: Page, identifier: str) -> None:
"""
Hover the mouse over the element with the given identifier.
Args:
page (Page): The Playwright page object.
identifier (str): The element identifier.
"""
assert page is not None
target = page.locator(f"[__elementId='{identifier}']")
# See if it exists
try:
await target.wait_for(timeout=5000)
except TimeoutError:
raise ValueError("No such element.") from None
# Hover over it
await target.scroll_into_view_if_needed()
await asyncio.sleep(0.3)
box = cast(Dict[str, Union[int, float]], await target.bounding_box())
if self.animate_actions:
await self.add_cursor_box(page, identifier)
# Move cursor to the box slowly
start_x, start_y = self.last_cursor_position
end_x, end_y = box["x"] + box["width"] / 2, box["y"] + box["height"] / 2
await self.gradual_cursor_animation(page, start_x, start_y, end_x, end_y)
await asyncio.sleep(0.1)
await page.mouse.move(box["x"] + box["width"] / 2, box["y"] + box["height"] / 2)
await self.remove_cursor_box(page, identifier)
else:
await page.mouse.move(box["x"] + box["width"] / 2, box["y"] + box["height"] / 2)
async def fill_id(self, page: Page, identifier: str, value: str, press_enter: bool = True) -> None:
"""
Fill the element with the given identifier with the specified value.
Args:
page (Page): The Playwright page object.
identifier (str): The element identifier.
value (str): The value to fill.
"""
assert page is not None
target = page.locator(f"[__elementId='{identifier}']")
# See if it exists
try:
await target.wait_for(timeout=5000)
except TimeoutError:
raise ValueError("No such element.") from None
# Fill it
await target.scroll_into_view_if_needed()
box = cast(Dict[str, Union[int, float]], await target.bounding_box())
if self.animate_actions:
await self.add_cursor_box(page, identifier)
# Move cursor to the box slowly
start_x, start_y = self.last_cursor_position
end_x, end_y = box["x"] + box["width"] / 2, box["y"] + box["height"] / 2
await self.gradual_cursor_animation(page, start_x, start_y, end_x, end_y)
await asyncio.sleep(0.1)
# Focus on the element
await target.focus()
if self.animate_actions:
# fill char by char to mimic human speed for short text and type fast for long text
if len(value) < 100:
delay_typing_speed = 50 + 100 * random.random()
else:
delay_typing_speed = 10
await target.press_sequentially(value, delay=delay_typing_speed)
else:
try:
await target.fill(value)
except PlaywrightError:
await target.press_sequentially(value)
if press_enter:
await target.press("Enter")
if self.animate_actions:
await self.remove_cursor_box(page, identifier)
async def scroll_id(self, page: Page, identifier: str, direction: str) -> None:
"""
Scroll the element with the given identifier in the specified direction.
Args:
page (Page): The Playwright page object.
identifier (str): The element identifier.
direction (str): The direction to scroll ("up" or "down").
"""
assert page is not None
await page.evaluate(
f"""
(function() {{
let elm = document.querySelector("[__elementId='{identifier}']");
if (elm) {{
if ("{direction}" == "up") {{
elm.scrollTop = Math.max(0, elm.scrollTop - elm.clientHeight);
}}
else {{
elm.scrollTop = Math.min(elm.scrollHeight - elm.clientHeight, elm.scrollTop + elm.clientHeight);
}}
}}
}})();
"""
)
async def get_webpage_text(self, page: Page, n_lines: int = 50) -> str:
"""
Retrieve the text content of the web page.
Args:
page (Page): The Playwright page object.
n_lines (int): The number of lines to return from the page inner text.
Returns:
str: The text content of the page.
"""
assert page is not None
try:
text_in_viewport = await page.evaluate("""() => {
return document.body.innerText;
}""")
text_in_viewport = "\n".join(text_in_viewport.split("\n")[:n_lines])
# remove empty lines
text_in_viewport = "\n".join([line for line in text_in_viewport.split("\n") if line.strip()])
assert isinstance(text_in_viewport, str)
return text_in_viewport
except Exception:
return ""
async def get_visible_text(self, page: Page) -> str:
"""
Retrieve the text content of the browser viewport (approximately).
Args:
page (Page): The Playwright page object.
Returns:
str: The text content of the page.
"""
assert page is not None
try:
await page.evaluate(self._page_script)
except Exception:
pass
result = await page.evaluate("MultimodalWebSurfer.getVisibleText();")
assert isinstance(result, str)
return result
async def get_page_markdown(self, page: Page) -> str:
"""
Retrieve the markdown content of the web page.
Currently not implemented.
Args:
page (Page): The Playwright page object.
Returns:
str: The markdown content of the page.
"""
assert page is not None
if self._markdown_converter is None and markitdown is not None:
self._markdown_converter = markitdown.MarkItDown()
assert self._markdown_converter is not None
html = await page.evaluate("document.documentElement.outerHTML;")
res = self._markdown_converter.convert_stream(
io.BytesIO(html.encode("utf-8")), file_extension=".html", url=page.url
)
assert hasattr(res, "text_content") and isinstance(res.text_content, str)
return res.text_content
else:
return await self.get_webpage_text(page, n_lines=200)