diff --git a/docs/05_bring_your_own_model_provider.md b/docs/05_bring_your_own_model_provider.md index 04f17d48..071394ea 100644 --- a/docs/05_bring_your_own_model_provider.md +++ b/docs/05_bring_your_own_model_provider.md @@ -175,6 +175,72 @@ class MyVlmProvider(VlmProvider): --- +## Image Scaling + +Before a screenshot is sent to a model, it is preprocessed by an **image scaler**. The scaler resizes the image to match the model's optimal input resolution, which affects both token cost and coordinate precision. + +All scalers inherit from `ImageScaler`: + +| Class | Behaviour | Used by | +|-------|-----------|---------| +| `PatchOptimizedImageScaler` | Finds the largest aspect-preserving size within a patch-based token budget (`max_edge`, `max_tokens`, `patch_size`) | `AskUIVlmProvider`, `AnthropicVlmProvider`, `OpenAIVlmProvider` | +| `ContainedImageScaler` | Fits within `max_width` ×ばつ `max_height` bounds | Default in `VlmProvider` base class | + +### Configuring the Maximum Image Edge + +All built-in providers accept an `image_edge_max` parameter that controls the maximum pixel dimension of screenshots sent to the model. You can also set it via the `ASKUI_VLM_MAX_IMAGE_EDGE` environment variable: + +``` +ASKUI_VLM_MAX_IMAGE_EDGE=1568 +``` + +Or pass it directly: + +```python +from askui import AgentSettings, ComputerAgent +from askui.model_providers import AnthropicVlmProvider + +with ComputerAgent(settings=AgentSettings( + vlm_provider=AnthropicVlmProvider(image_edge_max=1568), +)) as agent: + agent.act("Open settings") +``` + +### Using a Custom Image Scaler + +To fully replace the scaling strategy, pass an `image_scaler` instance. When provided, `image_edge_max` is ignored: + +```python +from askui import AgentSettings, ComputerAgent +from askui.model_providers import ( + AnthropicVlmProvider, + ContainedImageScaler, +) + +with ComputerAgent(settings=AgentSettings( + vlm_provider=AnthropicVlmProvider( + image_scaler=ContainedImageScaler(max_width=1280, max_height=720), + ), +)) as agent: + agent.act("Open settings") +``` + +### Implementing a Custom Image Scaler + +For fully custom scaling logic, subclass `ImageScaler`: + +```python +from PIL import Image +from askui.model_providers import ImageScaler + +class MyImageScaler(ImageScaler): + def __call__(self, image: Image.Image) -> Image.Image: + # Your custom scaling logic + return image.resize((1024, 768), Image.Resampling.LANCZOS) +``` + +--- + ## Advanced: Injecting a Custom Client For full control over HTTP settings (timeouts, proxies, retries), you can inject a pre-configured client: diff --git a/docs/07_tools.md b/docs/07_tools.md index ffb8552c..737c10f6 100644 --- a/docs/07_tools.md +++ b/docs/07_tools.md @@ -181,6 +181,18 @@ A tool’s __call__ method may return: - None - a list or tuple containing any of the above +**Image size limit:** When a tool returns a `PIL.Image.Image`, it is the tool's responsibility to ensure the image does not exceed ×ばつ2000 px** (longest side ≤ 2000 px). The Claude API enforces a ×ばつ2000 px per-image limit when more than 20 images are sent in a single request, which is common in agentic loops. Use `downscale_image()` from `askui.utils.llm_image_utils` to downscale images that may be too large: + +```python +from PIL import Image +from askui.utils.llm_image_utils import downscale_image + +image: Image.Image = ... # your image +image = downscale_image(image, max_dimension=2000) +``` + +This preserves the original aspect ratio and only downscales images whose longest side exceeds the limit. + ### Complete Example Here’s a greeting tool that demonstrates all the key concepts: diff --git a/src/askui/android_agent.py b/src/askui/android_agent.py index 98b79143..29b96a15 100644 --- a/src/askui/android_agent.py +++ b/src/askui/android_agent.py @@ -87,7 +87,6 @@ def __init__( ) -> None: reporter = CompositeReporter(reporters=reporters) self.os = PpadbAgentOs(device_identifier=device, reporter=reporter) - self.act_agent_os_facade = AndroidAgentOsFacade(self.os) super().__init__( reporter=reporter, retry=retry, @@ -97,6 +96,11 @@ def __init__( callbacks=callbacks, truncation_strategy=truncation_strategy, ) + self.act_agent_os_facade = AndroidAgentOsFacade( + self.os, + coordinate_space=self._vlm_provider.coordinate_space, + image_scaler=self._vlm_provider.image_scaler, + ) self.act_tool_collection.add_agent_os(self.act_agent_os_facade) # Override default act settings with Android-specific settings self.act_settings = ActSettings( diff --git a/src/askui/computer_agent.py b/src/askui/computer_agent.py index ad0a6627..7f121dbf 100644 --- a/src/askui/computer_agent.py +++ b/src/askui/computer_agent.py @@ -130,7 +130,9 @@ def __init__( truncation_strategy=truncation_strategy, ) self.act_agent_os_facade: ComputerAgentOsFacade = ComputerAgentOsFacade( - self.tools.os + self.tools.os, + coordinate_space=self._vlm_provider.coordinate_space, + image_scaler=self._vlm_provider.image_scaler, ) self.act_tool_collection.add_agent_os(self.act_agent_os_facade) # Override default act settings with computer-specific settings diff --git a/src/askui/model_providers/__init__.py b/src/askui/model_providers/__init__.py index ae1f0d0d..5d6a034e 100644 --- a/src/askui/model_providers/__init__.py +++ b/src/askui/model_providers/__init__.py @@ -35,6 +35,17 @@ from askui.model_providers.openai_image_qa_provider import OpenAIImageQAProvider from askui.model_providers.openai_vlm_provider import OpenAIVlmProvider from askui.model_providers.vlm_provider import VlmProvider +from askui.models.shared.coordinate_space import ( + NormalizedCoordinateSpace, + PixelCoordinateSpace, + ScaledCoordinateSpace, + VlmCoordinateSpace, +) +from askui.models.shared.image_scaler import ( + ContainedImageScaler, + ImageScaler, + PatchOptimizedImageScaler, +) from askui.utils.model_pricing import ModelPricing __all__ = [ @@ -46,11 +57,18 @@ "DetectionProvider", "GoogleImageQAProvider", "ImageQAProvider", + "ContainedImageScaler", + "ImageScaler", "ModelPricing", + "PatchOptimizedImageScaler", + "NormalizedCoordinateSpace", "OllamaImageQAProvider", "OllamaVlmProvider", + "OpenAICompatibleVlmProvider", "OpenAIImageQAProvider", "OpenAIVlmProvider", - "OpenAICompatibleVlmProvider", + "PixelCoordinateSpace", + "ScaledCoordinateSpace", + "VlmCoordinateSpace", "VlmProvider", ] diff --git a/src/askui/model_providers/anthropic_vlm_provider.py b/src/askui/model_providers/anthropic_vlm_provider.py index 9edd42b9..e160bd1e 100644 --- a/src/askui/model_providers/anthropic_vlm_provider.py +++ b/src/askui/model_providers/anthropic_vlm_provider.py @@ -14,11 +14,13 @@ ThinkingConfigParam, ToolChoiceParam, ) +from askui.models.shared.image_scaler import ImageScaler, PatchOptimizedImageScaler from askui.models.shared.prompts import SystemPrompt from askui.models.shared.tools import ToolCollection from askui.utils.model_pricing import ModelPricing _DEFAULT_MODEL_ID = "claude-sonnet-4-6" +_DEFAULT_MAX_IMAGE_EDGE = 1024 class AnthropicVlmProvider(VlmProvider): @@ -46,6 +48,13 @@ class AnthropicVlmProvider(VlmProvider): cost in USD per 1M output tokens. cache_write_cost_per_million_tokens (float | None, optional): Override cost in USD per 1M cache write input tokens. + image_scaler (`ImageScaler` | None, optional): Custom image preprocessing + callable. If ``None``, uses Anthropic-optimized patch-based scaling + controlled by ``image_edge_max``. + image_edge_max (int | None, optional): Maximum edge length (in pixels) + for screenshots sent to the model. Only used when ``image_scaler`` + is not provided. Reads ``ASKUI_VLM_MAX_IMAGE_EDGE`` from the + environment if not provided. Defaults to 1024. cache_read_cost_per_million_tokens (float | None, optional): Override cost in USD per 1M cache read input tokens. @@ -70,6 +79,8 @@ def __init__( auth_token: str | None = None, model_id: str | None = None, client: Anthropic | None = None, + image_scaler: ImageScaler | None = None, + image_edge_max: int | None = None, input_cost_per_million_tokens: float | None = None, output_cost_per_million_tokens: float | None = None, cache_write_cost_per_million_tokens: float | None = None, @@ -78,6 +89,14 @@ def __init__( self._model_id_value = ( model_id or os.environ.get("VLM_PROVIDER_MODEL_ID") or _DEFAULT_MODEL_ID ) + resolved_edge_max = ( + image_edge_max + or int(os.environ.get("ASKUI_VLM_MAX_IMAGE_EDGE", "0")) + or _DEFAULT_MAX_IMAGE_EDGE + ) + self._image_scaler = image_scaler or PatchOptimizedImageScaler( + max_edge=resolved_edge_max + ) if client is not None: self.client = client else: @@ -104,6 +123,11 @@ def model_id(self) -> str: def pricing(self) -> ModelPricing | None: return self._pricing + @property + @override + def image_scaler(self) -> ImageScaler: + return self._image_scaler + @cached_property def _messages_api(self) -> AnthropicMessagesApi: """Lazily initialise the AnthropicMessagesApi on first use.""" diff --git a/src/askui/model_providers/askui_vlm_provider.py b/src/askui/model_providers/askui_vlm_provider.py index d149deff..0337b79b 100644 --- a/src/askui/model_providers/askui_vlm_provider.py +++ b/src/askui/model_providers/askui_vlm_provider.py @@ -15,10 +15,12 @@ ThinkingConfigParam, ToolChoiceParam, ) +from askui.models.shared.image_scaler import ImageScaler, PatchOptimizedImageScaler from askui.models.shared.prompts import SystemPrompt from askui.models.shared.tools import ToolCollection _DEFAULT_MODEL_ID = "claude-sonnet-4-6" +_DEFAULT_MAX_IMAGE_EDGE = 1024 class AskUIVlmProvider(VlmProvider): @@ -29,14 +31,21 @@ class AskUIVlmProvider(VlmProvider): on the first API call, not at construction time. Args: - workspace_id (str | None, optional): AskUI workspace ID. Reads - `ASKUI_WORKSPACE_ID` from the environment if not provided. - token (str | None, optional): AskUI API token. Reads `ASKUI_TOKEN` - from the environment if not provided. - model_id (str, optional): Claude model to use. Defaults to - `"claude-sonnet-4-6"`. - client (Anthropic | None, optional): Pre-configured Anthropic client. - If provided, `workspace_id` and `token` are ignored. + askui_settings (`AskUiInferenceApiSettings` | None, optional): + Connection settings (workspace ID, token, base URL). Reads + from environment variables if not provided. + model_id (str | None, optional): Claude model to use. Defaults to + ``"claude-sonnet-4-6"``. + client (`Anthropic` | None, optional): Pre-configured Anthropic client. + If provided, ``askui_settings`` is only used for the base URL. + image_scaler (`ImageScaler` | None, optional): Custom image preprocessing + callable. If ``None``, uses Anthropic-optimized patch-based scaling + controlled by ``image_edge_max``. + image_edge_max (int | None, optional): Maximum edge length (in pixels) + for screenshots sent to the model. Only used when ``image_scaler`` + is not provided. Reads ``ASKUI_VLM_MAX_IMAGE_EDGE`` from the + environment if not provided. Defaults to 1024. + Example: ```python from askui import AgentSettings, ComputerAgent @@ -44,8 +53,6 @@ class AskUIVlmProvider(VlmProvider): agent = ComputerAgent(settings=AgentSettings( vlm_provider=AskUIVlmProvider( - workspace_id="my-workspace", - token="my-token", model_id="claude-opus-4-6-20260401", ) )) @@ -57,18 +64,33 @@ def __init__( askui_settings: AskUiInferenceApiSettings | None = None, model_id: str | None = None, client: Anthropic | None = None, + image_scaler: ImageScaler | None = None, + image_edge_max: int | None = None, ) -> None: self._askui_settings = askui_settings or AskUiInferenceApiSettings() self._model_id_value = ( model_id or os.environ.get("VLM_PROVIDER_MODEL_ID") or _DEFAULT_MODEL_ID ) self._injected_client = client + resolved_edge_max = ( + image_edge_max + or int(os.environ.get("ASKUI_VLM_MAX_IMAGE_EDGE", "0")) + or _DEFAULT_MAX_IMAGE_EDGE + ) + self._image_scaler = image_scaler or PatchOptimizedImageScaler( + max_edge=resolved_edge_max + ) @property @override def model_id(self) -> str: return self._model_id_value + @property + @override + def image_scaler(self) -> ImageScaler: + return self._image_scaler + @cached_property def _messages_api(self) -> AnthropicMessagesApi: """Lazily initialise the AnthropicMessagesApi on first use.""" diff --git a/src/askui/model_providers/ollama_vlm_provider.py b/src/askui/model_providers/ollama_vlm_provider.py index e06fa408..91e19b93 100644 --- a/src/askui/model_providers/ollama_vlm_provider.py +++ b/src/askui/model_providers/ollama_vlm_provider.py @@ -1,12 +1,23 @@ """OllamaVlmProvider — VLM access via a local Ollama instance.""" from openai import OpenAI +from typing_extensions import override from askui.model_providers.openai_vlm_provider import OpenAIVlmProvider +from askui.models.shared.coordinate_space import ( + PixelCoordinateSpace, + ScaledCoordinateSpace, + VlmCoordinateSpace, +) +from askui.models.shared.image_scaler import ImageScaler _DEFAULT_BASE_URL = "http://localhost:11434/v1" _DEFAULT_MODEL_ID = "qwen3.5" +_QWEN_COORDINATE_SPACE = ScaledCoordinateSpace(width=1000, height=1000) +_HOLO_COORDINATE_SPACE = ScaledCoordinateSpace(width=1000, height=1000) +_KIMI_COORDINATE_SPACE = ScaledCoordinateSpace(width=1000, height=1000) + class OllamaVlmProvider(OpenAIVlmProvider): """VLM provider that routes requests to a local Ollama instance. @@ -14,6 +25,11 @@ class OllamaVlmProvider(OpenAIVlmProvider): Thin convenience wrapper around `OpenAIVlmProvider` with Ollama defaults (``base_url``, ``api_key``, ``model_id``). + Qwen and Holo models are automatically detected and their coordinate + space is set to ``ScaledCoordinateSpace(width=1000, height=1000)``. + Kimi models use ``NormalizedCoordinateSpace()``. + Pass ``coordinate_space`` explicitly to override auto-detection. + Args: model_id (str, optional): Ollama model to use. Defaults to ``"qwen3.5"``. @@ -21,6 +37,18 @@ class OllamaVlmProvider(OpenAIVlmProvider): API. Defaults to ``"http://localhost:11434/v1"``. client (`OpenAI` | None, optional): Pre-configured OpenAI client. If provided, ``base_url`` is ignored. + coordinate_space (VlmCoordinateSpace | None, optional): The coordinate + grid the model emits coordinates in. ``None`` (the default) + enables auto-detection based on ``model_id``. + image_scaler (`ImageScaler` | None, optional): Custom image preprocessing + callable. If ``None``, inherits from `OpenAIVlmProvider`. + image_scaler (`ImageScaler` | None, optional): Custom image preprocessing + callable. If ``None``, inherits from `OpenAIVlmProvider`. + image_edge_max (int | None, optional): Maximum edge length (in pixels) + for screenshots sent to the model. Only used when ``image_scaler`` + is not provided. Reads ``ASKUI_VLM_MAX_IMAGE_EDGE`` from the + environment if not provided. Inherits the default from + `OpenAIVlmProvider` (1024). Example: ```python @@ -40,10 +68,31 @@ def __init__( model_id: str = _DEFAULT_MODEL_ID, base_url: str = _DEFAULT_BASE_URL, client: OpenAI | None = None, + coordinate_space: VlmCoordinateSpace | None = None, + image_scaler: ImageScaler | None = None, + image_edge_max: int | None = None, ) -> None: + self._coordinate_space_override = coordinate_space super().__init__( model_id=model_id, api_key="ollama", # Ollama requires no auth; OpenAI SDK needs a value base_url=base_url, client=client, + coordinate_space=coordinate_space or PixelCoordinateSpace(), + image_scaler=image_scaler, + image_edge_max=image_edge_max, ) + + @property + @override + def coordinate_space(self) -> VlmCoordinateSpace: + if self._coordinate_space_override is not None: + return self._coordinate_space_override + model_lower = self._model_id_value.lower() + if "qwen" in model_lower: + return _QWEN_COORDINATE_SPACE + if "holo" in model_lower: + return _HOLO_COORDINATE_SPACE + if "kimi" in model_lower: + return _KIMI_COORDINATE_SPACE + return self._coordinate_space diff --git a/src/askui/model_providers/openai_compatible_vlm_provider.py b/src/askui/model_providers/openai_compatible_vlm_provider.py index aae55c11..2f6fb9f9 100644 --- a/src/askui/model_providers/openai_compatible_vlm_provider.py +++ b/src/askui/model_providers/openai_compatible_vlm_provider.py @@ -4,6 +4,13 @@ from openai import OpenAI from askui.model_providers.openai_vlm_provider import OpenAIVlmProvider +from askui.models.shared.coordinate_space import ( + PixelCoordinateSpace, + VlmCoordinateSpace, +) +from askui.models.shared.image_scaler import ImageScaler + +_DEFAULT_COORDINATE_SPACE = PixelCoordinateSpace() class OpenAICompatibleVlmProvider(OpenAIVlmProvider): @@ -20,6 +27,16 @@ class OpenAICompatibleVlmProvider(OpenAIVlmProvider): (e.g. ``"https://my-host/v1/chat/completions"``). model_id (str): Model name expected by the deployment. api_key (str | None, optional): API key for the endpoint. + coordinate_space (`VlmCoordinateSpace` | None, optional): The coordinate + grid the model emits coordinates in. If ``None``, inherits the + default from `OpenAIVlmProvider` (pixel coordinates). + image_scaler (`ImageScaler` | None, optional): Custom image preprocessing + callable. If ``None``, inherits from `OpenAIVlmProvider`. + image_edge_max (int | None, optional): Maximum edge length (in pixels) + for screenshots sent to the model. Only used when ``image_scaler`` + is not provided. Reads ``ASKUI_VLM_MAX_IMAGE_EDGE`` from the + environment if not provided. Inherits the default from + `OpenAIVlmProvider` (1024). Example: ```python @@ -41,6 +58,9 @@ def __init__( endpoint_url: str, model_id: str | None = None, api_key: str | None = None, + coordinate_space: VlmCoordinateSpace = _DEFAULT_COORDINATE_SPACE, + image_scaler: ImageScaler | None = None, + image_edge_max: int | None = None, ) -> None: def _rewrite_url(request: httpx.Request) -> None: request.url = httpx.URL(endpoint_url) @@ -56,4 +76,7 @@ def _rewrite_url(request: httpx.Request) -> None: super().__init__( model_id=model_id, client=client, + coordinate_space=coordinate_space, + image_scaler=image_scaler, + image_edge_max=image_edge_max, ) diff --git a/src/askui/model_providers/openai_vlm_provider.py b/src/askui/model_providers/openai_vlm_provider.py index 47475cc7..789ee573 100644 --- a/src/askui/model_providers/openai_vlm_provider.py +++ b/src/askui/model_providers/openai_vlm_provider.py @@ -14,11 +14,18 @@ ThinkingConfigParam, ToolChoiceParam, ) +from askui.models.shared.coordinate_space import ( + PixelCoordinateSpace, + VlmCoordinateSpace, +) +from askui.models.shared.image_scaler import ImageScaler, PatchOptimizedImageScaler from askui.models.shared.prompts import SystemPrompt from askui.models.shared.tools import ToolCollection from askui.utils.model_pricing import ModelPricing _DEFAULT_MODEL_ID = "gpt-5.4" +_DEFAULT_COORDINATE_SPACE = PixelCoordinateSpace() +_DEFAULT_MAX_IMAGE_EDGE = 1024 class OpenAIVlmProvider(VlmProvider): @@ -36,6 +43,16 @@ class OpenAIVlmProvider(VlmProvider): to the OpenAI API (``https://api.openai.com/v1``). client (`OpenAI` | None, optional): Pre-configured OpenAI client. If provided, ``api_key`` and ``base_url`` are ignored. + coordinate_space (VlmCoordinateSpace, optional): The coordinate grid + the model emits coordinates in. Defaults to the screenshot + resolution (native pixel coordinates). + image_scaler (`ImageScaler` | None, optional): Custom image preprocessing + callable. If ``None``, uses patch-based scaling controlled by + ``image_edge_max``. + image_edge_max (int | None, optional): Maximum edge length (in pixels) + for screenshots sent to the model. Only used when ``image_scaler`` + is not provided. Reads ``ASKUI_VLM_MAX_IMAGE_EDGE`` from the + environment if not provided. Defaults to 1024. Example: ```python @@ -57,6 +74,9 @@ def __init__( api_key: str | None = None, base_url: str | None = None, client: OpenAI | None = None, + coordinate_space: VlmCoordinateSpace = _DEFAULT_COORDINATE_SPACE, + image_scaler: ImageScaler | None = None, + image_edge_max: int | None = None, input_cost_per_million_tokens: float | None = None, output_cost_per_million_tokens: float | None = None, cache_write_cost_per_million_tokens: float | None = None, @@ -65,6 +85,17 @@ def __init__( self._model_id_value = ( model_id or os.environ.get("VLM_PROVIDER_MODEL_ID") or _DEFAULT_MODEL_ID ) + self._coordinate_space = coordinate_space + resolved_edge_max = ( + image_edge_max + or int(os.environ.get("ASKUI_VLM_MAX_IMAGE_EDGE", "0")) + or _DEFAULT_MAX_IMAGE_EDGE + ) + self._image_scaler = image_scaler or PatchOptimizedImageScaler( + max_edge=resolved_edge_max, + max_tokens=1536, + patch_size=32, + ) if client is not None: self._client = client else: @@ -86,16 +117,32 @@ def __init__( def model_id(self) -> str: return self._model_id_value + @property + @override + def coordinate_space(self) -> VlmCoordinateSpace: + return self._coordinate_space + @property @override def pricing(self) -> ModelPricing | None: return self._pricing + @property + @override + def image_scaler(self) -> ImageScaler: + return self._image_scaler + @cached_property def _messages_api(self) -> OpenAIMessagesApi: """Lazily initialise the `OpenAIMessagesApi` on first use.""" return OpenAIMessagesApi(client=self._client) + @override + def augment_system_prompt(self, system: SystemPrompt) -> SystemPrompt: + """Append coordinate and resolution info to the system prompt.""" + coord_info = self.coordinate_space.build_prompt_section() + return SystemPrompt(prompt=f"{str(system)}\n\n{coord_info}") + @override def create_message( self, @@ -108,6 +155,8 @@ def create_message( temperature: float | None = None, provider_options: dict[str, Any] | None = None, ) -> MessageParam: + if system is not None: + system = self.augment_system_prompt(system) return self._messages_api.create_message( messages=messages, model_id=self._model_id_value, diff --git a/src/askui/model_providers/vlm_provider.py b/src/askui/model_providers/vlm_provider.py index 1e98b972..b526ea6a 100644 --- a/src/askui/model_providers/vlm_provider.py +++ b/src/askui/model_providers/vlm_provider.py @@ -8,10 +8,18 @@ ThinkingConfigParam, ToolChoiceParam, ) +from askui.models.shared.coordinate_space import ( + PixelCoordinateSpace, + VlmCoordinateSpace, +) +from askui.models.shared.image_scaler import ContainedImageScaler, ImageScaler from askui.models.shared.prompts import SystemPrompt from askui.models.shared.tools import ToolCollection from askui.utils.model_pricing import ModelPricing +_DEFAULT_COORDINATE_SPACE = PixelCoordinateSpace() +_DEFAULT_IMAGE_SCALER = ContainedImageScaler() + class VlmProvider(ABC): """Interface for Vision Language Model providers. @@ -44,6 +52,17 @@ class VlmProvider(ABC): def model_id(self) -> str: """The model identifier used by this provider.""" + @property + def coordinate_space(self) -> VlmCoordinateSpace: + """The coordinate space this model emits coordinates in. + + Returns a `VlmCoordinateSpace` describing the grid the model uses. + The default is `PixelCoordinateSpace` (native pixel coordinates). + Override in subclasses when the model uses a different grid + (e.g. ``ScaledCoordinateSpace(1000, 1000)`` for Qwen). + """ + return _DEFAULT_COORDINATE_SPACE + @property def pricing(self) -> ModelPricing | None: """Pricing information for this provider's model. @@ -53,6 +72,28 @@ def pricing(self) -> ModelPricing | None: """ return None + @property + def image_scaler(self) -> ImageScaler: + """Callable that preprocesses a screenshot before sending to the model. + + Override in subclasses for provider-specific sizing. + """ + return _DEFAULT_IMAGE_SCALER + + def augment_system_prompt(self, system: SystemPrompt) -> SystemPrompt: + """Hook for providers to augment the system prompt before sending. + + Called by ``create_message()`` implementations. The base + implementation returns the prompt unchanged. Override in + subclasses that need to inject provider-specific information + (e.g. coordinate bounds for non-Anthropic models). + + The original ``SystemPrompt`` object is **not** mutated — + implementations should create a new ``SystemPrompt`` wrapping + the augmented text. + """ + return system + @abstractmethod def create_message( self, diff --git a/src/askui/models/anthropic/get_model.py b/src/askui/models/anthropic/get_model.py index 7bed5627..421126e6 100644 --- a/src/askui/models/anthropic/get_model.py +++ b/src/askui/models/anthropic/get_model.py @@ -20,7 +20,7 @@ from askui.models.types.response_schemas import ResponseSchema from askui.prompts.get_prompts import SYSTEM_PROMPT_GET from askui.utils.excel_utils import OfficeDocumentSource -from askui.utils.image_utils import scale_image_to_fit +from askui.utils.llm_image_utils import compute_contained_size, resize_image from askui.utils.pdf_utils import PdfSource from askui.utils.source_utils import Source @@ -78,10 +78,13 @@ def get( if response_schema is not None: error_msg = "Response schema is not yet supported for Anthropic" raise NotImplementedError(error_msg) - scaled_image = scale_image_to_fit( - source.root, - get_settings.resolution, + target_size = compute_contained_size( + source.root.width, + source.root.height, + get_settings.resolution.width, + get_settings.resolution.height, ) + scaled_image = resize_image(source.root, target_size) messages = built_messages_for_get_and_locate(scaled_image, query) message = self._messages_api.create_message( messages=messages, diff --git a/src/askui/models/askui/locate_models/anthropic_locate_model.py b/src/askui/models/askui/locate_models/anthropic_locate_model.py index d2b78c27..3856b8f7 100644 --- a/src/askui/models/askui/locate_models/anthropic_locate_model.py +++ b/src/askui/models/askui/locate_models/anthropic_locate_model.py @@ -20,8 +20,8 @@ from askui.utils.image_utils import ( ImageSource, scale_coordinates, - scale_image_to_fit, ) +from askui.utils.llm_image_utils import compute_contained_size, resize_image class AnthropicLocateModel(LocateModel): @@ -77,14 +77,17 @@ def locate( try: prompt = f"Click on {locator_serialized}" resolution = locate_settings.resolution - screen_width = resolution.width - screen_height = resolution.height - scaled_image = scale_image_to_fit( - image.root, - resolution, + target_size = compute_contained_size( + image.root.width, + image.root.height, + resolution.width, + resolution.height, ) + scaled_image = resize_image(image.root, target_size) messages = built_messages_for_get_and_locate(scaled_image, prompt) - system = build_system_prompt_locate(str(screen_width), str(screen_height)) + system = build_system_prompt_locate( + str(scaled_image.width), str(scaled_image.height) + ) message = self._messages_api.create_message( messages=messages, model_id=self._model_id, @@ -100,7 +103,7 @@ def locate( scale_coordinates( extract_click_coordinates(content_text.text), image.root.size, - resolution, + scaled_image.size, inverse=True, ) ] diff --git a/src/askui/models/shared/__init__.py b/src/askui/models/shared/__init__.py index 4df27a7b..ac5186a0 100644 --- a/src/askui/models/shared/__init__.py +++ b/src/askui/models/shared/__init__.py @@ -1,5 +1,12 @@ from .android_base_tool import AndroidBaseTool from .computer_base_tool import ComputerBaseTool +from .coordinate_space import ( + NormalizedCoordinateSpace, + PixelCoordinateSpace, + ScaledCoordinateSpace, + VlmCoordinateSpace, +) +from .image_scaler import ContainedImageScaler, ImageScaler, PatchOptimizedImageScaler from .tool_tags import ToolTags try: @@ -13,7 +20,14 @@ __all__ = [ "AndroidBaseTool", "ComputerBaseTool", + "ContainedImageScaler", + "ImageScaler", + "PatchOptimizedImageScaler", + "NormalizedCoordinateSpace", + "PixelCoordinateSpace", + "ScaledCoordinateSpace", "ToolTags", + "VlmCoordinateSpace", ] if _PLAYWRIGHT_AVAILABLE: diff --git a/src/askui/models/shared/coordinate_space.py b/src/askui/models/shared/coordinate_space.py new file mode 100644 index 00000000..c2cd7c71 --- /dev/null +++ b/src/askui/models/shared/coordinate_space.py @@ -0,0 +1,104 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod + +from pydantic import BaseModel, Field + + +def _common_prompt_lines() -> list[str]: + return ["* Coordinate origin is the top-left corner (0, 0)"] + + +class VlmCoordinateSpace(BaseModel, ABC): + """Abstract base for VLM coordinate conventions. + + Each subclass describes one coordinate grid a VLM may emit and knows + how to map those coordinates back to pixel space and how to render + the matching prompt section. + """ + + @property + def maps_to_screenshot_pixels(self) -> bool: + """Whether model coordinates are absolute pixels in the screenshot image. + + When ``True``, coordinates need padding-aware inverse scaling + (screenshot space to device space). When ``False``, coordinates + are in a normalised grid and map directly to device resolution. + """ + return False + + @abstractmethod + def map_to_target( + self, x: float, y: float, target_resolution: tuple[int, int] + ) -> tuple[int, int]: + """Map model coordinates to pixel coordinates in *target_resolution*.""" + + @abstractmethod + def build_prompt_section(self) -> str: + """Build prompt text describing coordinate bounds for the model.""" + + +class PixelCoordinateSpace(VlmCoordinateSpace): + """Identity mapping -- coordinates already in pixel space. + + Used by Anthropic/Claude which emit coordinates matching the + screenshot resolution. + """ + + @property + def maps_to_screenshot_pixels(self) -> bool: + return True + + def map_to_target( + self, + x: float, + y: float, + target_resolution: tuple[int, int], # noqa: ARG002 + ) -> tuple[int, int]: + return int(x), int(y) + + def build_prompt_section(self) -> str: + lines = _common_prompt_lines() + lines.append( + "* Coordinates are in pixel space matching the screenshot dimensions" + ) + return "\n".join(lines) + + +class ScaledCoordinateSpace(VlmCoordinateSpace): + """Integer grid (e.g. 1000x1000 for Qwen). Linear scaling.""" + + width: int = Field(gt=0, description="Width of the coordinate grid") + height: int = Field(gt=0, description="Height of the coordinate grid") + + def map_to_target( + self, x: float, y: float, target_resolution: tuple[int, int] + ) -> tuple[int, int]: + tw, th = target_resolution + return int(x * tw / self.width), int(y * th / self.height) + + def build_prompt_section(self) -> str: + lines = _common_prompt_lines() + lines.append( + f"* Emit coordinates in a {self.width}x{self.height} " + f"normalised grid: 0 <= x < {self.width}, " + f"0 <= y < {self.height}" + ) + return "\n".join(lines) + + +class NormalizedCoordinateSpace(VlmCoordinateSpace): + """0.0-1.0 float grid (Kimi). No fields.""" + + def map_to_target( + self, x: float, y: float, target_resolution: tuple[int, int] + ) -> tuple[int, int]: + tw, th = target_resolution + return int(x * tw), int(y * th) + + def build_prompt_section(self) -> str: + lines = _common_prompt_lines() + lines.append( + "* Emit coordinates as normalised floats: 0.0 <= x <= 1.0, 0.0 <= y <= 1.0" + ) + return "\n".join(lines) diff --git a/src/askui/models/shared/image_scaler.py b/src/askui/models/shared/image_scaler.py new file mode 100644 index 00000000..12a8be29 --- /dev/null +++ b/src/askui/models/shared/image_scaler.py @@ -0,0 +1,86 @@ +"""Image scaler types used by VLM providers.""" + +from abc import ABC, abstractmethod + +from PIL import Image + +from askui.utils.llm_image_utils import ( + compute_contained_size, + compute_patch_optimized_image, + resize_image, +) + + +class ImageScaler(ABC): + """Base class for image scalers used by VLM providers. + + Subclasses implement ``__call__`` to preprocess a screenshot + before it is sent to a model. + """ + + @abstractmethod + def __call__(self, image: Image.Image) -> Image.Image: + """Scale ``image`` for model consumption.""" + + +class PatchOptimizedImageScaler(ImageScaler): + """Image scaler that fits images within a patch-based token budget. + + Uses `compute_patch_optimized_image()` under the hood: the image + is aspect-preserving scaled so that neither dimension exceeds + ``max_edge`` and the total patch count stays within ``max_tokens``. + + Args: + max_edge (int): Maximum allowed dimension (width or height). + max_tokens (int): Maximum allowed number of image tokens. + patch_size (int): Side length of a single patch in pixels. + """ + + def __init__( + self, + max_edge: int = 1568, + max_tokens: int = 1568, + patch_size: int = 28, + ) -> None: + self._max_edge = max_edge + self._max_tokens = max_tokens + self._patch_size = patch_size + + def __call__(self, image: Image.Image) -> Image.Image: + """Scale ``image`` to fit within the configured token budget.""" + return compute_patch_optimized_image( + image, + max_edge=self._max_edge, + max_tokens=self._max_tokens, + patch_size=self._patch_size, + ) + + +class ContainedImageScaler(ImageScaler): + """Image scaler that fits images within maximum width/height bounds. + + Preserves aspect ratio. Images already within the bounds are + returned unchanged. + + Args: + max_width (int): Maximum allowed width. + max_height (int): Maximum allowed height. + """ + + def __init__( + self, + max_width: int = 1024, + max_height: int = 768, + ) -> None: + self._max_width = max_width + self._max_height = max_height + + def __call__(self, image: Image.Image) -> Image.Image: + """Scale ``image`` to fit within the configured bounds.""" + target = compute_contained_size( + image.width, + image.height, + max_width=self._max_width, + max_height=self._max_height, + ) + return resize_image(image, target) diff --git a/src/askui/tools/android/agent_os_facade.py b/src/askui/tools/android/agent_os_facade.py index f27d0eee..e94e42be 100644 --- a/src/askui/tools/android/agent_os_facade.py +++ b/src/askui/tools/android/agent_os_facade.py @@ -1,74 +1,82 @@ -from typing import List, Optional, Tuple +from __future__ import annotations -from PIL import Image +from typing import TYPE_CHECKING from askui.models.shared.tool_tags import ToolTags from askui.tools.android.agent_os import ANDROID_KEY, AndroidAgentOs, AndroidDisplay -from askui.tools.android.uiautomator_hierarchy import UIElementCollection -from askui.utils.image_utils import scale_coordinates, scale_image_to_fit +from askui.tools.coordinate_scaler import CoordinateScaler + +if TYPE_CHECKING: + from PIL import Image + + from askui.models.shared.coordinate_space import VlmCoordinateSpace + from askui.models.shared.image_scaler import ImageScaler + from askui.tools.android.uiautomator_hierarchy import UIElementCollection class AndroidAgentOsFacade(AndroidAgentOs): - """ - Facade for AndroidAgentOs that adds coordinate scaling functionality. - It is used to scale the coordinates to the target resolution - and back to the real screen resolution. + """Facade for `AndroidAgentOs` that adds coordinate scaling. + + Screenshots are scaled using the provider's image scaler so that the + AI model sees an optimally sized image. Coordinate-based inputs + (``tap``, ``swipe``, ``drag_and_drop``) are scaled back up to the + real device resolution before being forwarded to the underlying agent OS. + + Args: + agent_os (`AndroidAgentOs`): The real Android agent OS to wrap. + coordinate_space (`VlmCoordinateSpace`): Coordinate grid the model uses. + image_scaler (`ImageScaler`): Callable to preprocess screenshots. """ - def __init__(self, agent_os: AndroidAgentOs) -> None: + def __init__( + self, + agent_os: AndroidAgentOs, + coordinate_space: VlmCoordinateSpace, + image_scaler: ImageScaler, + ) -> None: self._agent_os: AndroidAgentOs = agent_os - self._target_resolution: Tuple[int, int] = (1024, 768) - self._real_screen_resolution: Optional[Tuple[int, int]] = None + self._scaler = CoordinateScaler( + coordinate_space=coordinate_space, + image_scaler=image_scaler, + fetch_real_resolution=self._fetch_real_resolution, + take_screenshot=self._take_screenshot, + ) self.tags = self._agent_os.tags + [ToolTags.SCALED_AGENT_OS.value] + def _fetch_real_resolution(self) -> tuple[int, int]: + return self._agent_os.screenshot().size + + def _take_screenshot(self) -> Image.Image: + return self.screenshot() + def connect(self) -> None: self._agent_os.connect() - self._real_screen_resolution = self._agent_os.screenshot().size + self._scaler.real_screen_resolution = self._agent_os.screenshot().size def disconnect(self) -> None: self._agent_os.disconnect() - self._real_screen_resolution = None + self._scaler.real_screen_resolution = None def screenshot(self) -> Image.Image: screenshot = self._agent_os.screenshot() - self._real_screen_resolution = screenshot.size - return scale_image_to_fit( - screenshot, - self._target_resolution, - ) - - def _scale_coordinates( - self, - x: int, - y: int, - from_agent: bool = True, - ) -> Tuple[int, int]: - if self._real_screen_resolution is None: - self._real_screen_resolution = self._agent_os.screenshot().size - - return scale_coordinates( - (x, y), - self._real_screen_resolution, - self._target_resolution, - inverse=from_agent, - ) + return self._scaler.scale_screenshot(screenshot) - def tap(self, x: int, y: int) -> None: - x, y = self._scale_coordinates(x, y) + def tap(self, x: float, y: float) -> None: + x, y = self._scaler.scale_coordinates(x, y) self._agent_os.tap(x, y) def swipe( - self, x1: int, y1: int, x2: int, y2: int, duration_in_ms: int = 1000 + self, x1: float, y1: float, x2: float, y2: float, duration_in_ms: int = 1000 ) -> None: - x1, y1 = self._scale_coordinates(x1, y1) - x2, y2 = self._scale_coordinates(x2, y2) + x1, y1 = self._scaler.scale_coordinates(x1, y1) + x2, y2 = self._scaler.scale_coordinates(x2, y2) self._agent_os.swipe(x1, y1, x2, y2, duration_in_ms) def drag_and_drop( - self, x1: int, y1: int, x2: int, y2: int, duration_in_ms: int = 1000 + self, x1: float, y1: float, x2: float, y2: float, duration_in_ms: int = 1000 ) -> None: - x1, y1 = self._scale_coordinates(x1, y1) - x2, y2 = self._scale_coordinates(x2, y2) + x1, y1 = self._scaler.scale_coordinates(x1, y1) + x2, y2 = self._scaler.scale_coordinates(x2, y2) self._agent_os.drag_and_drop(x1, y1, x2, y2, duration_in_ms) def type(self, text: str) -> None: @@ -78,7 +86,7 @@ def key_tap(self, key: ANDROID_KEY) -> None: self._agent_os.key_tap(key) def key_combination( - self, keys: List[ANDROID_KEY], duration_in_ms: int = 100 + self, keys: list[ANDROID_KEY], duration_in_ms: int = 100 ) -> None: self._agent_os.key_combination(keys, duration_in_ms) @@ -90,27 +98,27 @@ def get_connected_displays(self) -> list[AndroidDisplay]: def set_display_by_index(self, display_index: int = 0) -> None: self._agent_os.set_display_by_index(display_index) - self._real_screen_resolution = None + self._scaler.real_screen_resolution = None def set_display_by_unique_id(self, display_unique_id: int) -> None: self._agent_os.set_display_by_unique_id(display_unique_id) - self._real_screen_resolution = None + self._scaler.real_screen_resolution = None def set_display_by_id(self, display_id: int) -> None: self._agent_os.set_display_by_id(display_id) - self._real_screen_resolution = None + self._scaler.real_screen_resolution = None def set_display_by_name(self, display_name: str) -> None: self._agent_os.set_display_by_name(display_name) - self._real_screen_resolution = None + self._scaler.real_screen_resolution = None def set_device_by_index(self, device_index: int = 0) -> None: self._agent_os.set_device_by_index(device_index) - self._real_screen_resolution = None + self._scaler.real_screen_resolution = None def set_device_by_serial_number(self, device_sn: str) -> None: self._agent_os.set_device_by_serial_number(device_sn) - self._real_screen_resolution = None + self._scaler.real_screen_resolution = None def get_connected_devices_serial_numbers(self) -> list[str]: return self._agent_os.get_connected_devices_serial_numbers() @@ -134,7 +142,7 @@ def get_ui_elements(self) -> UIElementCollection: if element.center is None: continue element.set_center( - self._scale_coordinates( + self._scaler.scale_coordinates( x=element.center[0], y=element.center[1], from_agent=False, diff --git a/src/askui/tools/computer_agent_os_facade.py b/src/askui/tools/computer_agent_os_facade.py index 28a1a8c5..676a6454 100644 --- a/src/askui/tools/computer_agent_os_facade.py +++ b/src/askui/tools/computer_agent_os_facade.py @@ -2,12 +2,13 @@ from PIL import Image +from askui.models.shared.coordinate_space import VlmCoordinateSpace +from askui.models.shared.image_scaler import ImageScaler from askui.models.shared.tool_tags import ToolTags from askui.tools.agent_os import ( AgentOs, Coordinate, Display, - DisplaySize, DisplaysListResponse, InputEvent, ModifierKey, @@ -15,7 +16,7 @@ PcKey, ) from askui.tools.askui.askui_controller import RenderObjectStyle # noqa: TC001 -from askui.utils.image_utils import scale_coordinates, scale_image_to_fit +from askui.tools.coordinate_scaler import CoordinateScaler if TYPE_CHECKING: from askui.tools.askui.askui_ui_controller_grpc.generated import ( @@ -29,47 +30,66 @@ class ComputerAgentOsFacade(AgentOs): - """ - Facade for AgentOs that adds coordinate scaling functionality. + """Facade for `AgentOs` that adds coordinate scaling. + + Screenshots are scaled using the provider's image scaler so that the + AI model sees an optimally sized image. Coordinate-based inputs + are scaled back up to the real screen resolution before being forwarded + to the underlying agent OS. - This class is used to scale the coordinates to the target resolution - and back to the real screen resolution. + Args: + agent_os (`AgentOs`): The real agent OS to wrap. + coordinate_space (`VlmCoordinateSpace`): Coordinate grid the model uses. + image_scaler (`ImageScaler`): Callable to preprocess screenshots. """ - def __init__(self, agent_os: AgentOs) -> None: + def __init__( + self, + agent_os: AgentOs, + coordinate_space: VlmCoordinateSpace, + image_scaler: ImageScaler, + ) -> None: self._agent_os = agent_os - self._target_resolution: tuple[int, int] = (1024, 768) - self._real_screen_resolution: DisplaySize | None = None + self._scaler = CoordinateScaler( + coordinate_space=coordinate_space, + image_scaler=image_scaler, + fetch_real_resolution=self._fetch_real_screen_resolution, + take_screenshot=self._take_silent_screenshot, + ) self.tags.append(ToolTags.SCALED_AGENT_OS.value) def connect(self) -> None: self._agent_os.connect() - self._real_screen_resolution = self._agent_os.retrieve_active_display().size + self._scaler.real_screen_resolution = self._fetch_real_screen_resolution() def disconnect(self) -> None: self._agent_os.disconnect() - self._real_screen_resolution = None + self._scaler.real_screen_resolution = None def screenshot(self, report: bool = True) -> Image.Image: screenshot = self._agent_os.screenshot(report=report) - self._real_screen_resolution = DisplaySize( - width=screenshot.width, height=screenshot.height - ) - return scale_image_to_fit(screenshot, self._target_resolution) + return self._scaler.scale_screenshot(screenshot) - def mouse_move(self, x: int, y: int, duration: int = 500) -> None: - scaled_x, scaled_y = self._scale_coordinates_back(x, y) + def _take_silent_screenshot(self) -> Image.Image: + return self.screenshot(report=False) + + def _fetch_real_screen_resolution(self) -> tuple[int, int]: + display = self._agent_os.retrieve_active_display() + return display.size.width, display.size.height + + def mouse_move(self, x: float, y: float, duration: int = 500) -> None: + scaled_x, scaled_y = self._scaler.scale_coordinates(x, y) self._agent_os.mouse_move(scaled_x, scaled_y, duration) def get_mouse_position(self) -> Coordinate: mouse_position = self._agent_os.get_mouse_position() - scaled_x, scaled_y = self._scale_coordinates_back( + scaled_x, scaled_y = self._scaler.scale_coordinates( mouse_position.x, mouse_position.y, from_agent=False ) return Coordinate(x=scaled_x, y=scaled_y) - def set_mouse_position(self, x: int, y: int) -> None: - scaled_x, scaled_y = self._scale_coordinates_back(x, y) + def set_mouse_position(self, x: float, y: float) -> None: + scaled_x, scaled_y = self._scaler.scale_coordinates(x, y) self._agent_os.set_mouse_position(scaled_x, scaled_y) def type(self, text: str, typing_speed: int = 50) -> None: @@ -113,7 +133,7 @@ def retrieve_active_display(self) -> Display: def set_display(self, display: int = 1) -> None: self._agent_os.set_display(display) - self._real_screen_resolution = None + self._scaler.real_screen_resolution = None def run_command(self, command: str, timeout_ms: int = 30000) -> None: self._agent_os.run_command(command, timeout_ms) @@ -290,7 +310,7 @@ def get_file(self, path: str) -> Image.Image | str: """ response = self._agent_os.get_file(path) if isinstance(response, Image.Image): - return scale_image_to_fit(response, self._target_resolution) + return self._scaler.scale_screenshot(response) return response def remove_virtual_displays(self) -> None: @@ -298,21 +318,4 @@ def remove_virtual_displays(self) -> None: Remove virtual displays from the controller, leaving real displays only. """ self._agent_os.remove_virtual_displays() - self._real_screen_resolution = None - - def _scale_coordinates_back( - self, - x: int, - y: int, - from_agent: bool = True, - check_coordinates_in_bounds: bool = True, - ) -> tuple[int, int]: - if self._real_screen_resolution is None: - self._real_screen_resolution = self._agent_os.retrieve_active_display().size - return scale_coordinates( - (x, y), - (self._real_screen_resolution.width, self._real_screen_resolution.height), - self._target_resolution, - inverse=from_agent, - check_coordinates_in_bounds=check_coordinates_in_bounds, - ) + self._scaler.real_screen_resolution = None diff --git a/src/askui/tools/coordinate_scaler.py b/src/askui/tools/coordinate_scaler.py new file mode 100644 index 00000000..39d12194 --- /dev/null +++ b/src/askui/tools/coordinate_scaler.py @@ -0,0 +1,99 @@ +"""Coordinate scaling helper used by all agent OS facades.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from askui.utils.image_utils import scale_coordinates + +if TYPE_CHECKING: + from collections.abc import Callable + + from PIL import Image + + from askui.models.shared.coordinate_space import VlmCoordinateSpace + from askui.models.shared.image_scaler import ImageScaler + + +class CoordinateScaler: + """Maps coordinates between model space and device space. + + Each agent OS facade owns an instance and delegates scaling to it. + + Args: + coordinate_space (`VlmCoordinateSpace`): Coordinate grid the model uses. + image_scaler (`ImageScaler`): Callable to preprocess screenshots. + fetch_real_resolution (`Callable`): Callback that returns the real + ``(width, height)`` of the screen/device when it is not yet known. + take_screenshot (`Callable`): Callback that triggers a screenshot + so that ``target_resolution`` can be populated. + """ + + def __init__( + self, + coordinate_space: VlmCoordinateSpace, + image_scaler: ImageScaler, + fetch_real_resolution: Callable[[], tuple[int, int]], + take_screenshot: Callable[[], Image.Image], + ) -> None: + self._coordinate_space = coordinate_space + self._image_scaler = image_scaler + self._fetch_real_resolution = fetch_real_resolution + self._take_screenshot = take_screenshot + self.target_resolution: tuple[int, int] | None = None + self.real_screen_resolution: tuple[int, int] | None = None + + def scale_screenshot(self, screenshot: Image.Image) -> Image.Image: + """Record real resolution, apply scaler, record target resolution.""" + self.real_screen_resolution = screenshot.size + scaled = self._image_scaler(screenshot) + self.target_resolution = scaled.size + return scaled + + def scale_coordinates( + self, + x: float, + y: float, + from_agent: bool = True, + check_coordinates_in_bounds: bool = True, + ) -> tuple[int, int]: + """Map coordinates between model space and device space. + + When ``from_agent=True``, maps model-emitted coordinates to real + device pixels. When ``from_agent=False``, maps device coordinates + to model space (e.g. for reporting element positions back to the model). + """ + if self.real_screen_resolution is None: + self.real_screen_resolution = self._fetch_real_resolution() + + target_resolution = self._ensure_target_resolution() + + if from_agent: + if self._coordinate_space.maps_to_screenshot_pixels: + mapped_x, mapped_y = self._coordinate_space.map_to_target( + x, y, target_resolution + ) + return scale_coordinates( + (mapped_x, mapped_y), + self.real_screen_resolution, + target_resolution, + inverse=True, + check_coordinates_in_bounds=check_coordinates_in_bounds, + ) + return self._coordinate_space.map_to_target( + x, y, self.real_screen_resolution + ) + + return scale_coordinates( + (int(x), int(y)), + self.real_screen_resolution, + target_resolution, + inverse=False, + check_coordinates_in_bounds=check_coordinates_in_bounds, + ) + + def _ensure_target_resolution(self) -> tuple[int, int]: + if self.target_resolution is None: + self._take_screenshot() + assert self.target_resolution is not None # noqa: S101 + return self.target_resolution diff --git a/src/askui/tools/playwright/agent_os_facade.py b/src/askui/tools/playwright/agent_os_facade.py index 091ff804..c6969fe4 100644 --- a/src/askui/tools/playwright/agent_os_facade.py +++ b/src/askui/tools/playwright/agent_os_facade.py @@ -2,65 +2,65 @@ from PIL import Image +from askui.models.shared.coordinate_space import VlmCoordinateSpace +from askui.models.shared.image_scaler import ImageScaler from askui.models.shared.tool_tags import ToolTags from askui.tools.agent_os import Display, ModifierKey, PcKey +from askui.tools.coordinate_scaler import CoordinateScaler from askui.tools.playwright.agent_os import PlaywrightAgentOs -from askui.utils.image_utils import scale_coordinates, scale_image_to_fit class PlaywrightAgentOsFacade(PlaywrightAgentOs): """Facade for `PlaywrightAgentOs` that adds coordinate scaling. - Screenshots are scaled down to a fixed target resolution so that the - AI model always sees a consistent image size. Coordinate-based inputs + Screenshots are scaled using the provider's image scaler so that the + AI model sees an optimally sized image. Coordinate-based inputs (``mouse_move``) are scaled back up to the real page resolution before being forwarded to the underlying agent OS. Args: - agent_os (PlaywrightAgentOs): The real Playwright agent OS to wrap. + agent_os (`PlaywrightAgentOs`): The real Playwright agent OS to wrap. + coordinate_space (`VlmCoordinateSpace`): Coordinate grid the model uses. + image_scaler (`ImageScaler`): Callable to preprocess screenshots. """ - def __init__(self, agent_os: PlaywrightAgentOs) -> None: + def __init__( + self, + agent_os: PlaywrightAgentOs, + coordinate_space: VlmCoordinateSpace, + image_scaler: ImageScaler, + ) -> None: self._agent_os = agent_os - self._target_resolution: tuple[int, int] = (1024, 768) - self._real_screen_resolution: tuple[int, int] | None = None + self._scaler = CoordinateScaler( + coordinate_space=coordinate_space, + image_scaler=image_scaler, + fetch_real_resolution=self._fetch_real_resolution, + take_screenshot=self._take_silent_screenshot, + ) self.tags = self._agent_os.tags + [ToolTags.SCALED_AGENT_OS.value] + def _fetch_real_resolution(self) -> tuple[int, int]: + return self._agent_os.screenshot(report=False).size + + def _take_silent_screenshot(self) -> Image.Image: + return self.screenshot(report=False) + def connect(self) -> None: self._agent_os.connect() - self._real_screen_resolution = self._agent_os.screenshot( - report=False, + self._scaler.real_screen_resolution = self._agent_os.screenshot( + report=False ).size def disconnect(self) -> None: self._agent_os.disconnect() - self._real_screen_resolution = None + self._scaler.real_screen_resolution = None def screenshot(self, report: bool = True) -> Image.Image: screenshot = self._agent_os.screenshot(report=report) - self._real_screen_resolution = screenshot.size - return scale_image_to_fit(screenshot, self._target_resolution) - - def _scale_coordinates( - self, - x: int, - y: int, - from_agent: bool = True, - ) -> tuple[int, int]: - if self._real_screen_resolution is None: - self._real_screen_resolution = self._agent_os.screenshot( - report=False, - ).size - return scale_coordinates( - (x, y), - self._real_screen_resolution, - self._target_resolution, - inverse=from_agent, - ) + return self._scaler.scale_screenshot(screenshot) - def mouse_move(self, x: int, y: int, duration: int = 500) -> None: - scaled_x, scaled_y = self._scale_coordinates(x, y) - # scaled_x, scaled_y = x, y + def mouse_move(self, x: float, y: float, duration: int = 500) -> None: + scaled_x, scaled_y = self._scaler.scale_coordinates(x, y) self._agent_os.mouse_move(scaled_x, scaled_y, duration) def type(self, text: str, typing_speed: int = 50) -> None: diff --git a/src/askui/tools/store/universal/load_image_tool.py b/src/askui/tools/store/universal/load_image_tool.py index 5a0512e9..b763f2ee 100644 --- a/src/askui/tools/store/universal/load_image_tool.py +++ b/src/askui/tools/store/universal/load_image_tool.py @@ -4,7 +4,7 @@ from PIL import Image from askui.models.shared.tools import Tool -from askui.utils.image_utils import scale_image_to_fit +from askui.utils.llm_image_utils import compute_contained_size, resize_image class LoadImageTool(Tool): @@ -116,7 +116,13 @@ def __call__(self, image_path: str = "") -> Tuple[str, Image.Image]: raise FileExistsError(error_msg) image = Image.open(absolute_image_path) - image = scale_image_to_fit(image, target_size=self._target_size) + target_size = compute_contained_size( + image.width, + image.height, + self._target_size[0], + self._target_size[1], + ) + image = resize_image(image, target_size) return ( f"Image was successfully loaded from {absolute_image_path}", diff --git a/src/askui/utils/llm_image_utils.py b/src/askui/utils/llm_image_utils.py new file mode 100644 index 00000000..ba3ef5cc --- /dev/null +++ b/src/askui/utils/llm_image_utils.py @@ -0,0 +1,218 @@ +"""Image utilities for LLM vision model preprocessing. + +Functions for computing optimal image sizes based on patch-based token budgets +and resizing images for VLM consumption. +""" + +import logging +import math + +from PIL import Image + +logger = logging.getLogger(__name__) + + +def count_image_tokens(width: int, height: int, patch_size: int = 28) -> int: + """Count the number of tokens an image will consume in a patch-based VLM. + + Each non-overlapping ``patch_size x patch_size`` square maps to one token. + + Args: + width (int): Image width in pixels. + height (int): Image height in pixels. + patch_size (int): Side length of a single patch in pixels. + + Returns: + int: Number of image tokens. + """ + patches_w = math.ceil(width / patch_size) + patches_h = math.ceil(height / patch_size) + return patches_w * patches_h + + +def compute_patch_optimized_size( + width: int, + height: int, + max_edge: int = 1568, + max_tokens: int = 1568, + patch_size: int = 28, +) -> tuple[int, int]: + """Compute the largest aspect-preserving size within a patch-based token budget. + + Uses binary search to find the biggest scale factor such that: + - Neither dimension exceeds ``max_edge``. + - ``count_image_tokens(w, h, patch_size) <= max_tokens``. + + Args: + width (int): Original image width. + height (int): Original image height. + max_edge (int): Maximum allowed dimension (width or height). + max_tokens (int): Maximum allowed number of image tokens. + patch_size (int): Patch size used by the model. + + Returns: + tuple[int, int]: Target ``(width, height)``. + """ + if width <= 0 or height <= 0: + error_msg = f"Image dimensions must be positive, got {width}x{height}" + raise ValueError(error_msg) + + # If already within all constraints, return as-is + if ( + width <= max_edge + and height <= max_edge + and count_image_tokens(width, height, patch_size) <= max_tokens + ): + return width, height + + # Clamp to max_edge first + scale = min(max_edge / width, max_edge / height, 1.0) + + # Binary search for largest scale that fits within token budget + lo, hi = 0.0, scale + for _ in range(50): + mid = (lo + hi) / 2 + w = max(1, int(width * mid)) + h = max(1, int(height * mid)) + if count_image_tokens(w, h, patch_size) <= max_tokens: + lo = mid + else: + hi = mid + + result_w = max(1, int(width * lo)) + result_h = max(1, int(height * lo)) + return result_w, result_h + + +def compute_contained_size( + width: int, + height: int, + max_width: int = 1024, + max_height: int = 768, +) -> tuple[int, int]: + """Compute the largest aspect-preserving size contained within max bounds. + + If the image already fits, returns its original dimensions. + + Args: + width (int): Original image width. + height (int): Original image height. + max_width (int): Maximum allowed width. + max_height (int): Maximum allowed height. + + Returns: + tuple[int, int]: Target ``(width, height)``. + """ + if width <= 0 or height <= 0: + error_msg = f"Image dimensions must be positive, got {width}x{height}" + raise ValueError(error_msg) + + if width <= max_width and height <= max_height: + return width, height + + scale = min(max_width / width, max_height / height) + return max(1, int(width * scale)), max(1, int(height * scale)) + + +def resize_image(image: Image.Image, target_size: tuple[int, int]) -> Image.Image: + """Resize an image to exact ``target_size`` using LANCZOS resampling. + + Logs a warning if the aspect ratio changes by more than 1%. + + Args: + image (Image.Image): Source image. + target_size (tuple[int, int]): Target ``(width, height)``. + + Returns: + Image.Image: Resized image. + """ + if image.size == target_size: + return image + + src_ratio = image.width / image.height + dst_ratio = target_size[0] / target_size[1] + if abs(src_ratio - dst_ratio) / max(src_ratio, dst_ratio)> 0.01: + logger.warning( + "Aspect ratio change during resize: %.3f -> %.3f", + src_ratio, + dst_ratio, + ) + + return image.resize(target_size, Image.Resampling.LANCZOS) + + +def compute_patch_optimized_image( + image: Image.Image, + max_edge: int = 1568, + max_tokens: int = 1568, + patch_size: int = 28, +) -> Image.Image: + """Resize an image to its patch-optimized size. + + Convenience wrapper that combines `compute_patch_optimized_size` and + `resize_image` into a single call. + + Args: + image (Image.Image): Source image. + max_edge (int): Maximum allowed dimension (width or height). + max_tokens (int): Maximum allowed number of image tokens. + patch_size (int): Patch size used by the model. + + Returns: + Image.Image: Resized image. + """ + target = compute_patch_optimized_size( + image.width, + image.height, + max_edge=max_edge, + max_tokens=max_tokens, + patch_size=patch_size, + ) + return resize_image(image, target) + + +def downscale_image( + image: Image.Image, + max_dimension: int = 2000, +) -> Image.Image: + """Downscale an image so its longest side does not exceed `max_dimension`. + + Convenience wrapper around `compute_contained_size()` and `resize_image()`. + Unlike ``scale_image_to_fit()`` from `askui.utils.image_utils`, this does + **not** add black padding — the output keeps its natural dimensions. + + Preserves the original aspect ratio. Images that are already + within the limit are returned unchanged. + + Args: + image (Image.Image): The PIL Image to downscale. + max_dimension (int, optional): Maximum allowed size for the longest side. + Defaults to `2000`. + + Returns: + Image.Image: The downscaled image, or the original if no scaling was needed. + """ + target = compute_contained_size( + image.width, image.height, max_width=max_dimension, max_height=max_dimension + ) + return resize_image(image, target) + + +def resize_and_pad_image( + image: Image.Image, + target_size: tuple[int, int], +) -> Image.Image: + """Resize preserving aspect ratio, then center on a padded canvas. + + Equivalent to the legacy ``scale_image_to_fit`` behaviour. + + Args: + image (Image.Image): Source image. + target_size (tuple[int, int]): Canvas ``(width, height)``. + + Returns: + Image.Image: Image centered on a ``target_size`` canvas. + """ + from askui.utils.image_utils import scale_image_to_fit + + return scale_image_to_fit(image, target_size) diff --git a/src/askui/web_agent.py b/src/askui/web_agent.py index fe47c5f9..d1c94232 100644 --- a/src/askui/web_agent.py +++ b/src/askui/web_agent.py @@ -60,7 +60,6 @@ def __init__( ) -> None: reporter = CompositeReporter(reporters=reporters) self.os = PlaywrightAgentOs(reporter) - self.act_agent_os_facade = PlaywrightAgentOsFacade(self.os) super().__init__( reporter=reporter, retry=retry, @@ -70,6 +69,11 @@ def __init__( callbacks=callbacks, truncation_strategy=truncation_strategy, ) + self.act_agent_os_facade = PlaywrightAgentOsFacade( + self.os, + coordinate_space=self._vlm_provider.coordinate_space, + image_scaler=self._vlm_provider.image_scaler, + ) self.act_tool_collection.add_agent_os(self.act_agent_os_facade) self.act_settings = ActSettings( messages=MessageSettings( diff --git a/tests/unit/model_providers/test_ollama_vlm_provider.py b/tests/unit/model_providers/test_ollama_vlm_provider.py index 143e7c35..e4fe32d3 100644 --- a/tests/unit/model_providers/test_ollama_vlm_provider.py +++ b/tests/unit/model_providers/test_ollama_vlm_provider.py @@ -6,6 +6,10 @@ from askui.model_providers.ollama_vlm_provider import OllamaVlmProvider from askui.models.shared.agent_message_param import MessageParam +from askui.models.shared.coordinate_space import ( + PixelCoordinateSpace, + ScaledCoordinateSpace, +) class TestOllamaVlmProvider: @@ -48,3 +52,66 @@ def test_create_message_delegates_to_messages_api(self) -> None: mock_client.chat.completions.create.assert_called_once() assert result.role == "assistant" + + def test_coordinate_space_auto_detects_qwen(self) -> None: + provider = OllamaVlmProvider(model_id="qwen3.5") + assert provider.coordinate_space == ScaledCoordinateSpace( + width=1000, height=1000 + ) + + def test_coordinate_space_auto_detects_qwen_case_insensitive(self) -> None: + provider = OllamaVlmProvider(model_id="Qwen2-VL") + assert provider.coordinate_space == ScaledCoordinateSpace( + width=1000, height=1000 + ) + + def test_coordinate_space_auto_detects_kimi(self) -> None: + provider = OllamaVlmProvider(model_id="kimi-vl") + assert provider.coordinate_space == ScaledCoordinateSpace( + width=1000, height=1000 + ) + + def test_coordinate_space_auto_detects_kimi_case_insensitive(self) -> None: + provider = OllamaVlmProvider(model_id="Kimi-VL-A3B") + assert provider.coordinate_space == ScaledCoordinateSpace( + width=1000, height=1000 + ) + + def test_coordinate_space_default_for_non_qwen(self) -> None: + provider = OllamaVlmProvider(model_id="llava") + assert provider.coordinate_space == PixelCoordinateSpace() + + def test_coordinate_space_explicit_override(self) -> None: + provider = OllamaVlmProvider( + model_id="llava", + coordinate_space=ScaledCoordinateSpace(width=500, height=500), + ) + assert provider.coordinate_space == ScaledCoordinateSpace(width=500, height=500) + + def test_coordinate_space_explicit_override_takes_precedence(self) -> None: + provider = OllamaVlmProvider( + model_id="qwen3.5", + coordinate_space=ScaledCoordinateSpace(width=2000, height=2000), + ) + assert provider.coordinate_space == ScaledCoordinateSpace( + width=2000, height=2000 + ) + + def test_coordinate_space_explicit_pixel_overrides_qwen_auto_detect(self) -> None: + provider = OllamaVlmProvider( + model_id="qwen3.5", + coordinate_space=PixelCoordinateSpace(), + ) + assert provider.coordinate_space == PixelCoordinateSpace() + + def test_coordinate_space_auto_detects_holo(self) -> None: + provider = OllamaVlmProvider(model_id="holo3.1-35b-a3b") + assert provider.coordinate_space == ScaledCoordinateSpace( + width=1000, height=1000 + ) + + def test_coordinate_space_auto_detects_holo_case_insensitive(self) -> None: + provider = OllamaVlmProvider(model_id="Holo-3.1-4B") + assert provider.coordinate_space == ScaledCoordinateSpace( + width=1000, height=1000 + ) diff --git a/tests/unit/model_providers/test_openai_vlm_provider.py b/tests/unit/model_providers/test_openai_vlm_provider.py index d51ff74b..a2b3d95a 100644 --- a/tests/unit/model_providers/test_openai_vlm_provider.py +++ b/tests/unit/model_providers/test_openai_vlm_provider.py @@ -3,9 +3,17 @@ from unittest.mock import MagicMock from openai import OpenAI +from PIL import Image from askui.model_providers.openai_vlm_provider import OpenAIVlmProvider from askui.models.shared.agent_message_param import MessageParam +from askui.models.shared.coordinate_space import ( + NormalizedCoordinateSpace, + PixelCoordinateSpace, + ScaledCoordinateSpace, +) +from askui.models.shared.image_scaler import ImageScaler +from askui.models.shared.prompts import SystemPrompt class TestOpenAIVlmProvider: @@ -41,3 +49,147 @@ def test_create_message_delegates_to_messages_api(self) -> None: mock_client.chat.completions.create.assert_called_once() assert result.role == "assistant" + + def test_coordinate_space_defaults_to_pixel(self) -> None: + provider = OpenAIVlmProvider(model_id="gpt-4o", api_key="sk-test") + assert provider.coordinate_space == PixelCoordinateSpace() + + def test_coordinate_space_passthrough(self) -> None: + provider = OpenAIVlmProvider( + model_id="gpt-4o", + api_key="sk-test", + coordinate_space=ScaledCoordinateSpace(width=1000, height=1000), + ) + assert provider.coordinate_space == ScaledCoordinateSpace( + width=1000, height=1000 + ) + + def test_augment_system_prompt_scaled_coordinate_space(self) -> None: + provider = OpenAIVlmProvider( + model_id="gpt-4o", + api_key="sk-test", + coordinate_space=ScaledCoordinateSpace(width=1000, height=1000), + ) + system = SystemPrompt(prompt="You are a helpful assistant.") + augmented = provider.augment_system_prompt(system) + + rendered = str(augmented) + assert "You are a helpful assistant." in rendered + assert "1000x1000 normalised grid" in rendered + + def test_augment_system_prompt_pixel_coordinate_space(self) -> None: + provider = OpenAIVlmProvider(model_id="gpt-4o", api_key="sk-test") + system = SystemPrompt(prompt="Base prompt.") + augmented = provider.augment_system_prompt(system) + + rendered = str(augmented) + assert "normalised grid" not in rendered + assert "pixel space matching the screenshot dimensions" in rendered + + +class TestImageScaler: + def test_default_scaler_returns_valid_image(self) -> None: + provider = OpenAIVlmProvider(model_id="gpt-4o", api_key="sk-test") + img = Image.new("RGB", (1920, 1080)) + scaled = provider.image_scaler(img) + assert scaled.width <= 2048 + assert scaled.height <= 2048 + + def test_custom_scaler_override(self) -> None: + class _FixedSizeScaler(ImageScaler): + def __call__(self, image: Image.Image) -> Image.Image: + return image.resize((100, 100)) + + provider = OpenAIVlmProvider( + model_id="gpt-4o", + api_key="sk-test", + image_scaler=_FixedSizeScaler(), + ) + img = Image.new("RGB", (1920, 1080)) + scaled = provider.image_scaler(img) + assert scaled.size == (100, 100) + + +class TestPixelCoordinateSpacePrompt: + def test_shows_pixel_space_description(self) -> None: + cs = PixelCoordinateSpace() + result = cs.build_prompt_section() + assert "pixel space matching the screenshot dimensions" in result + assert "normalised grid" not in result + + def test_includes_origin_info(self) -> None: + cs = PixelCoordinateSpace() + result = cs.build_prompt_section() + assert "top-left" in result + + +class TestScaledCoordinateSpacePrompt: + def test_shows_normalised_grid(self) -> None: + cs = ScaledCoordinateSpace(width=1000, height=1000) + result = cs.build_prompt_section() + assert "1000x1000 normalised grid" in result + assert "0 <= x < 1000" in result + assert "0 <= y < 1000" in result + + def test_includes_origin_info(self) -> None: + cs = ScaledCoordinateSpace(width=1000, height=1000) + result = cs.build_prompt_section() + assert "top-left" in result + + +class TestNormalizedCoordinateSpacePrompt: + def test_shows_normalised_floats(self) -> None: + cs = NormalizedCoordinateSpace() + result = cs.build_prompt_section() + assert "0.0 <= x <= 1.0" in result + assert "0.0 <= y <= 1.0" in result + assert "normalised floats" in result + + def test_includes_origin_info(self) -> None: + cs = NormalizedCoordinateSpace() + result = cs.build_prompt_section() + assert "top-left" in result + + +class TestMapsToScreenshotPixels: + def test_pixel_returns_true(self) -> None: + assert PixelCoordinateSpace().maps_to_screenshot_pixels is True + + def test_scaled_returns_false(self) -> None: + assert ( + ScaledCoordinateSpace(width=1000, height=1000).maps_to_screenshot_pixels + is False + ) + + def test_normalized_returns_false(self) -> None: + assert NormalizedCoordinateSpace().maps_to_screenshot_pixels is False + + +class TestMapToTarget: + def test_pixel_identity(self) -> None: + cs = PixelCoordinateSpace() + assert cs.map_to_target(512, 384, (1024, 768)) == (512, 384) + + def test_pixel_truncates_floats(self) -> None: + cs = PixelCoordinateSpace() + assert cs.map_to_target(512.7, 384.3, (1024, 768)) == (512, 384) + + def test_scaled_maps_correctly(self) -> None: + cs = ScaledCoordinateSpace(width=1000, height=1000) + assert cs.map_to_target(500, 500, (1024, 768)) == (512, 384) + + def test_scaled_zero(self) -> None: + cs = ScaledCoordinateSpace(width=1000, height=1000) + assert cs.map_to_target(0, 0, (1024, 768)) == (0, 0) + + def test_normalized_maps_correctly(self) -> None: + cs = NormalizedCoordinateSpace() + assert cs.map_to_target(0.5, 0.5, (1024, 768)) == (512, 384) + + def test_normalized_zero(self) -> None: + cs = NormalizedCoordinateSpace() + assert cs.map_to_target(0.0, 0.0, (1024, 768)) == (0, 0) + + def test_normalized_one(self) -> None: + cs = NormalizedCoordinateSpace() + assert cs.map_to_target(1.0, 1.0, (1024, 768)) == (1024, 768) diff --git a/tests/unit/tools/test_agent_os_facade_coordinates.py b/tests/unit/tools/test_agent_os_facade_coordinates.py new file mode 100644 index 00000000..dc36f21f --- /dev/null +++ b/tests/unit/tools/test_agent_os_facade_coordinates.py @@ -0,0 +1,268 @@ +"""Tests for coordinate mapping in agent OS facades. + +Verifies that non-pixel coordinate spaces (Qwen 0-1000, Kimi 0.0-1.0) +map directly to device resolution, bypassing the padded screenshot space. +""" + +from unittest.mock import MagicMock + +import pytest +from PIL import Image + +from askui.models.shared.coordinate_space import ( + NormalizedCoordinateSpace, + PixelCoordinateSpace, + ScaledCoordinateSpace, +) +from askui.models.shared.image_scaler import ContainedImageScaler +from askui.tools.android.agent_os_facade import AndroidAgentOsFacade + +_default_scaler = ContainedImageScaler() + + +def _make_android_facade( + device_size: tuple[int, int], + coordinate_space: PixelCoordinateSpace + | ScaledCoordinateSpace + | NormalizedCoordinateSpace, +) -> AndroidAgentOsFacade: + """Create an AndroidAgentOsFacade with a mocked agent OS.""" + mock_os = MagicMock() + mock_os.tags = [] + mock_os.screenshot.return_value = Image.new("RGB", device_size) + facade = AndroidAgentOsFacade( + mock_os, + coordinate_space=coordinate_space, + image_scaler=_default_scaler, + ) + facade._scaler.real_screen_resolution = device_size + # Set target resolution as the scaler would produce it + scaled = _default_scaler(Image.new("RGB", device_size)) + facade._scaler.target_resolution = scaled.size + return facade + + +class TestScaledCoordinateSpaceTallDevice: + """Qwen 0-1000 grid on a tall Android device (1080x2400). + + Non-pixel coordinate spaces map directly to device resolution, + so no padding offset is involved. + """ + + device = (1080, 2400) + cs = ScaledCoordinateSpace(width=1000, height=1000) + + def test_center_tap(self) -> None: + facade = _make_android_facade(self.device, self.cs) + x, y = facade._scaler.scale_coordinates(500, 500) + assert (x, y) == (540, 1200) + + def test_left_side_tap(self) -> None: + facade = _make_android_facade(self.device, self.cs) + x, y = facade._scaler.scale_coordinates(200, 500) + assert (x, y) == (216, 1200) + + def test_swipe_across(self) -> None: + facade = _make_android_facade(self.device, self.cs) + x1, y1 = facade._scaler.scale_coordinates(500, 500) + x2, y2 = facade._scaler.scale_coordinates(200, 500) + assert (x1, y1) == (540, 1200) + assert (x2, y2) == (216, 1200) + + def test_origin(self) -> None: + facade = _make_android_facade(self.device, self.cs) + x, y = facade._scaler.scale_coordinates(0, 0) + assert (x, y) == (0, 0) + + def test_max_corner(self) -> None: + facade = _make_android_facade(self.device, self.cs) + x, y = facade._scaler.scale_coordinates(1000, 1000) + assert (x, y) == (1080, 2400) + + +class TestNormalizedCoordinateSpaceTallDevice: + """Kimi 0.0-1.0 grid on a tall Android device (1080x2400).""" + + device = (1080, 2400) + cs = NormalizedCoordinateSpace() + + def test_center_tap(self) -> None: + facade = _make_android_facade(self.device, self.cs) + x, y = facade._scaler.scale_coordinates(0.5, 0.5) + assert (x, y) == (540, 1200) + + def test_left_side_tap(self) -> None: + facade = _make_android_facade(self.device, self.cs) + x, y = facade._scaler.scale_coordinates(0.2, 0.5) + assert (x, y) == (216, 1200) + + +class TestPixelCoordinateSpaceTallDevice: + """Claude pixel coordinates on a tall Android device (1080x2400). + + With the no-padding scaler, a 1080x2400 device is scaled to + compute_contained_size(1080, 2400, 1024, 768) = (345, 768). + Pixel coordinates are in the (345, 768) screenshot space and go + through the padding-aware inverse scaling pipeline. Because the + image nearly fills the target (only ~2 px rounding slack), offsets + are close to zero but not exactly zero. + """ + + device = (1080, 2400) + cs = PixelCoordinateSpace() + + def test_center_of_content(self) -> None: + """The center of the content area in the scaled screenshot.""" + facade = _make_android_facade(self.device, self.cs) + # Target resolution is (345, 768) — nearly no padding + x, y = facade._scaler.scale_coordinates(172, 384) + assert x == pytest.approx(538, abs=5) + assert y == pytest.approx(1200, abs=5) + + def test_near_top_left_of_content(self) -> None: + """Coordinate near top-left corner maps back close to origin.""" + facade = _make_android_facade(self.device, self.cs) + # Use (1, 2) instead of exact origin to avoid rounding-offset + # edge case that can produce small negative values. + x, y = facade._scaler.scale_coordinates(1, 2) + assert x == pytest.approx(3, abs=5) + assert y == pytest.approx(3, abs=5) + + +class TestSquareDevice: + """Verify no regression on a device with matching aspect ratio.""" + + device = (1024, 768) + cs = ScaledCoordinateSpace(width=1000, height=1000) + + def test_center(self) -> None: + facade = _make_android_facade(self.device, self.cs) + x, y = facade._scaler.scale_coordinates(500, 500) + assert (x, y) == (512, 384) + + +class TestFromAgentFalse: + """from_agent=False always maps device → screenshot pixel space.""" + + def test_device_to_screenshot_scaled_space(self) -> None: + facade = _make_android_facade( + (1080, 2400), ScaledCoordinateSpace(width=1000, height=1000) + ) + x, y = facade._scaler.scale_coordinates(540, 1200, from_agent=False) + # Target resolution is (345, 768), no padding + # Forward scaling: factor = 768/2400 = 0.32 + # x = 540 * 0.32 = 172.8 → 172, y = 1200 * 0.32 = 384 + assert x == pytest.approx(172, abs=2) + assert y == pytest.approx(384, abs=2) + + +# --------------------------------------------------------------------------- +# Parametrized tests across multiple resolutions +# --------------------------------------------------------------------------- + +_DEVICE_SIZES = [ + pytest.param((1080, 1920), id="FHD portrait"), + pytest.param((1920, 1080), id="FHD landscape"), + pytest.param((1440, 2560), id="QHD portrait"), + pytest.param((2560, 1440), id="QHD landscape"), + pytest.param((1080, 2400), id="tall Android"), + pytest.param((768, 1024), id="iPad portrait"), + pytest.param((320, 480), id="small phone"), + pytest.param((3840, 2160), id="4K landscape"), +] + + +class TestScaledCenterAcrossResolutions: + """Center tap (500, 500) in 0-1000 grid should always map to device center.""" + + cs = ScaledCoordinateSpace(width=1000, height=1000) + + @pytest.mark.parametrize("device_size", _DEVICE_SIZES) + def test_center_maps_to_device_center(self, device_size: tuple[int, int]) -> None: + facade = _make_android_facade(device_size, self.cs) + x, y = facade._scaler.scale_coordinates(500, 500) + assert x == device_size[0] // 2 + assert y == device_size[1] // 2 + + +class TestNormalizedCenterAcrossResolutions: + """Center tap (0.5, 0.5) in normalized grid should always map to device center.""" + + cs = NormalizedCoordinateSpace() + + @pytest.mark.parametrize("device_size", _DEVICE_SIZES) + def test_center_maps_to_device_center(self, device_size: tuple[int, int]) -> None: + facade = _make_android_facade(device_size, self.cs) + x, y = facade._scaler.scale_coordinates(0.5, 0.5) + assert x == device_size[0] // 2 + assert y == device_size[1] // 2 + + +class TestPixelRoundTripAcrossResolutions: + """Pixel-space center of scaled image should round-trip close to device center.""" + + cs = PixelCoordinateSpace() + + @pytest.mark.parametrize("device_size", _DEVICE_SIZES) + def test_pixel_center_round_trip(self, device_size: tuple[int, int]) -> None: + facade = _make_android_facade(device_size, self.cs) + target = facade._scaler.target_resolution + assert target is not None + cx, cy = target[0] // 2, target[1] // 2 + x, y = facade._scaler.scale_coordinates(cx, cy) + assert x == pytest.approx(device_size[0] // 2, abs=5) + assert y == pytest.approx(device_size[1] // 2, abs=5) + + +# --------------------------------------------------------------------------- +# Negative / edge-case tests +# --------------------------------------------------------------------------- + + +class TestOutOfBoundsCoordinates: + """Coordinates outside the valid range should raise ValueError.""" + + def test_negative_coordinates_pixel_space(self) -> None: + facade = _make_android_facade((1080, 1920), PixelCoordinateSpace()) + with pytest.raises(ValueError, match="out of bounds"): + facade._scaler.scale_coordinates(-10, -10) + + def test_exceeding_target_pixel_space(self) -> None: + facade = _make_android_facade((1080, 1920), PixelCoordinateSpace()) + target = facade._scaler.target_resolution + assert target is not None + with pytest.raises(ValueError, match="out of bounds"): + facade._scaler.scale_coordinates(target[0] + 100, target[1] + 100) + + def test_bounds_check_can_be_disabled(self) -> None: + facade = _make_android_facade((1080, 1920), PixelCoordinateSpace()) + target = facade._scaler.target_resolution + assert target is not None + # Should not raise when bounds checking is off + facade._scaler.scale_coordinates( + target[0] + 100, target[1] + 100, check_coordinates_in_bounds=False + ) + + +class TestResolutionLazyInit: + """Verify that real_screen_resolution is fetched lazily when not set.""" + + def test_fetches_resolution_on_first_scale(self) -> None: + mock_os = MagicMock() + mock_os.tags = [] + device_size = (1080, 1920) + mock_os.screenshot.return_value = Image.new("RGB", device_size) + cs = ScaledCoordinateSpace(width=1000, height=1000) + facade = AndroidAgentOsFacade( + mock_os, coordinate_space=cs, image_scaler=_default_scaler + ) + # real_screen_resolution starts unset + assert facade._scaler.real_screen_resolution is None # noqa: S101 + # Trigger a screenshot to populate target_resolution + facade.screenshot() + # Now scale — should have both resolutions set + scaler = facade._scaler + x, y = scaler.scale_coordinates(500, 500) + assert scaler.real_screen_resolution == device_size + assert x == 540 + assert y == 960 diff --git a/tests/unit/utils/test_llm_image_utils.py b/tests/unit/utils/test_llm_image_utils.py new file mode 100644 index 00000000..714d84c0 --- /dev/null +++ b/tests/unit/utils/test_llm_image_utils.py @@ -0,0 +1,145 @@ +"""Tests for LLM image utility functions.""" + +import logging + +import pytest +from PIL import Image + +from askui.utils.llm_image_utils import ( + compute_contained_size, + compute_patch_optimized_size, + count_image_tokens, + resize_and_pad_image, + resize_image, +) + + +class TestCountImageTokens: + def test_exact_patches(self) -> None: + # 56x56 with patch_size=28 → 2x2 = 4 tokens + assert count_image_tokens(56, 56, patch_size=28) == 4 + + def test_single_patch(self) -> None: + assert count_image_tokens(28, 28, patch_size=28) == 1 + + def test_partial_patches_round_up(self) -> None: + # 30x30 with patch_size=28 → ceil(30/28) * ceil(30/28) = 2*2 = 4 + assert count_image_tokens(30, 30, patch_size=28) == 4 + + def test_known_anthropic_value(self) -> None: + # 1568x1568 with patch_size=28 → 56*56 = 3136 + assert count_image_tokens(1568, 1568, patch_size=28) == 3136 + + def test_rectangular(self) -> None: + # 1024x768 with patch_size=28 → ceil(1024/28)*ceil(768/28) = 37*28 = 1036 + assert count_image_tokens(1024, 768, patch_size=28) == 37 * 28 + + +class TestComputePatchOptimizedSize: + def test_small_image_unchanged(self) -> None: + # A small image that fits within all constraints is returned as-is + w, h = compute_patch_optimized_size(200, 100) + assert w == 200 + assert h == 100 + + def test_respects_max_edge(self) -> None: + w, h = compute_patch_optimized_size(3000, 2000, max_edge=1568) + assert w <= 1568 + assert h <= 1568 + + def test_respects_max_tokens(self) -> None: + w, h = compute_patch_optimized_size( + 1920, 1080, max_edge=1568, max_tokens=1568, patch_size=28 + ) + tokens = count_image_tokens(w, h, patch_size=28) + assert tokens <= 1568 + + def test_preserves_aspect_ratio(self) -> None: + w, h = compute_patch_optimized_size(1920, 1080) + original_ratio = 1920 / 1080 + result_ratio = w / h + assert abs(original_ratio - result_ratio) / original_ratio < 0.02 + + def test_invalid_dimensions_raises(self) -> None: + with pytest.raises(ValueError, match="positive"): + compute_patch_optimized_size(0, 100) + + def test_openai_params(self) -> None: + w, h = compute_patch_optimized_size( + 1920, 1080, max_edge=2048, max_tokens=1536, patch_size=32 + ) + tokens = count_image_tokens(w, h, patch_size=32) + assert tokens <= 1536 + assert w <= 2048 + assert h <= 2048 + + +class TestComputeContainedSize: + def test_already_fits(self) -> None: + assert compute_contained_size(800, 600, 1024, 768) == (800, 600) + + def test_exact_match(self) -> None: + assert compute_contained_size(1024, 768, 1024, 768) == (1024, 768) + + def test_landscape_too_wide(self) -> None: + w, h = compute_contained_size(2048, 768, 1024, 768) + assert w <= 1024 + assert h <= 768 + + def test_portrait_too_tall(self) -> None: + w, h = compute_contained_size(768, 2048, 1024, 768) + assert w <= 1024 + assert h <= 768 + + def test_preserves_aspect_ratio(self) -> None: + w, h = compute_contained_size(1920, 1080, 1024, 768) + original_ratio = 1920 / 1080 + result_ratio = w / h + assert abs(original_ratio - result_ratio) / original_ratio < 0.02 + + def test_invalid_dimensions_raises(self) -> None: + with pytest.raises(ValueError, match="positive"): + compute_contained_size(0, 100) + + +class TestResizeImage: + def test_correct_dimensions(self) -> None: + img = Image.new("RGB", (1920, 1080)) + result = resize_image(img, (1024, 576)) + assert result.size == (1024, 576) + + def test_no_op_when_same_size(self) -> None: + img = Image.new("RGB", (1024, 768)) + result = resize_image(img, (1024, 768)) + assert result is img # Same object, no copy + + def test_aspect_ratio_warning_logged( + self, caplog: pytest.LogCaptureFixture + ) -> None: + img = Image.new("RGB", (1920, 1080)) + with caplog.at_level(logging.WARNING): + resize_image(img, (1024, 768)) + assert "Aspect ratio change" in caplog.text + + def test_no_warning_when_ratio_preserved( + self, caplog: pytest.LogCaptureFixture + ) -> None: + img = Image.new("RGB", (1920, 1080)) + with caplog.at_level(logging.WARNING): + resize_image(img, (960, 540)) + assert "Aspect ratio change" not in caplog.text + + +class TestResizeAndPadImage: + def test_correct_dimensions(self) -> None: + img = Image.new("RGB", (1920, 1080)) + result = resize_and_pad_image(img, (1024, 768)) + assert result.size == (1024, 768) + + def test_preserves_aspect_ratio_with_padding(self) -> None: + img = Image.new("RGB", (1080, 2400), color=(255, 0, 0)) + result = resize_and_pad_image(img, (1024, 768)) + assert result.size == (1024, 768) + # Check that some padding exists (black pixels at edges) + left_pixel = result.getpixel((0, 0)) + assert left_pixel == (0, 0, 0) # Black padding