-
Notifications
You must be signed in to change notification settings - Fork 134
feat(idb_client): installation + setup #130
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
charlestheprogrammer
wants to merge
3
commits into
main
Choose a base branch
from
feat/idb-client-setup
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+286
β0
Open
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
bad4866
feat(idb_client): installation + setup
charlestheprogrammer 52e9175
feat(idb_client): manage idb_companion lifecycle if no host is provided
charlestheprogrammer d06997f
feat(idb_client): global try-expect block and explicit return type
charlestheprogrammer File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,278 @@ | ||
| import asyncio | ||
| import json | ||
| import socket | ||
| import subprocess | ||
| from functools import wraps | ||
| from pathlib import Path | ||
| from typing import Any | ||
|
|
||
| from idb.common.types import HIDButtonType, InstalledAppInfo, InstalledArtifact, TCPAddress | ||
| from idb.grpc.client import Client | ||
|
|
||
| from minitap.mobile_use.utils.logger import get_logger | ||
|
|
||
| logger = get_logger(__name__) | ||
|
|
||
|
|
||
| def _find_available_port(start_port: int = 10882, max_attempts: int = 100) -> int: | ||
| for port in range(start_port, start_port + max_attempts): | ||
| with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: | ||
| try: | ||
| s.bind(("localhost", port)) | ||
| return port | ||
| except OSError: | ||
| continue | ||
| raise RuntimeError( | ||
| f"Could not find available port in range {start_port}-{start_port + max_attempts}" | ||
| ) | ||
|
|
||
|
|
||
| def with_idb_client(func): | ||
| """Decorator to handle idb client lifecycle. | ||
|
|
||
| Note: Function must have None or bool in return type. | ||
| """ | ||
|
|
||
| @wraps(func) | ||
| async def wrapper(self, *args, **kwargs): | ||
| try: | ||
| async with Client.build(address=self.address, logger=logger.logger) as client: | ||
| return await func(self, client, *args, **kwargs) | ||
| except Exception as e: | ||
| method_name = func.__name__ | ||
| logger.error(f"Failed to {method_name}: {e}") | ||
|
|
||
| return_type = func.__annotations__.get("return") | ||
| if return_type is bool: | ||
| return False | ||
| return None | ||
|
|
||
| return wrapper | ||
|
|
||
|
|
||
| class IdbClientWrapper: | ||
| """Wrapper around fb-idb client for iOS device automation with lifecycle management. | ||
|
|
||
| This wrapper can either manage the idb_companion process lifecycle locally or connect | ||
| to an external companion server. | ||
|
|
||
| Lifecycle Management: | ||
| - If host is None (default): Manages companion locally on localhost | ||
| - Call init_companion() to start the idb_companion process | ||
| - Call cleanup() to stop the companion process | ||
| - Or use as async context manager for automatic lifecycle | ||
| - If host is provided: Connects to external companion server | ||
| - init_companion() and cleanup() become no-ops | ||
| - You manage the external companion separately | ||
|
|
||
| Example: | ||
| # Managed companion (recommended for local development) | ||
| async with IdbClientWrapper(udid="device-id") as wrapper: | ||
| await wrapper.tap(100, 200) | ||
|
|
||
| # External companion (for production/remote) | ||
| wrapper = IdbClientWrapper(udid="device-id", host="remote-host", port=10882) | ||
| await wrapper.tap(100, 200) # No companion lifecycle management needed | ||
| """ | ||
|
|
||
| def __init__(self, udid: str, host: str | None = None, port: int | None = None): | ||
| self.udid = udid | ||
| self._manage_companion = host is None | ||
|
|
||
| if host is None: | ||
| actual_port = port if port is not None else _find_available_port() | ||
| self.address = TCPAddress(host="localhost", port=actual_port) | ||
| logger.debug(f"Will manage companion for {udid} on port {actual_port}") | ||
| else: | ||
| actual_port = port if port is not None else 10882 | ||
| self.address = TCPAddress(host=host, port=actual_port) | ||
|
|
||
| self.companion_process: subprocess.Popen | None = None | ||
|
|
||
| async def init_companion(self, idb_companion_path: str = "idb_companion") -> bool: | ||
| """ | ||
| Start the idb_companion process for this device. | ||
| Only starts if managing companion locally (host was None in __init__). | ||
|
|
||
| Args: | ||
| idb_companion_path: Path to idb_companion binary (default: "idb_companion" from PATH) | ||
|
|
||
| Returns: | ||
| True if companion started successfully, False otherwise | ||
| """ | ||
| if not self._manage_companion: | ||
| logger.info(f"Using external idb_companion at {self.address.host}:{self.address.port}") | ||
| return True | ||
|
|
||
| if self.companion_process is not None: | ||
| logger.warning(f"idb_companion already running for {self.udid}") | ||
| return True | ||
|
|
||
| try: | ||
| cmd = [idb_companion_path, "--udid", self.udid, "--grpc-port", str(self.address.port)] | ||
|
|
||
| logger.info(f"Starting idb_companion: {' '.join(cmd)}") | ||
| self.companion_process = subprocess.Popen( | ||
| cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True | ||
| ) | ||
|
|
||
| await asyncio.sleep(2) | ||
|
|
||
| if self.companion_process.poll() is not None: | ||
| stdout, stderr = self.companion_process.communicate() | ||
| logger.error(f"idb_companion failed to start: {stderr}") | ||
| self.companion_process = None | ||
| return False | ||
|
|
||
| logger.info( | ||
| f"idb_companion started successfully for {self.udid} on port {self.address.port}" | ||
| ) | ||
| return True | ||
|
|
||
| except Exception as e: | ||
| logger.error(f"Failed to start idb_companion: {e}") | ||
| self.companion_process = None | ||
| return False | ||
|
|
||
| async def cleanup(self) -> None: | ||
| if not self._manage_companion: | ||
| logger.debug(f"Not managing companion for {self.udid}, skipping cleanup") | ||
| return | ||
|
|
||
| if self.companion_process is None: | ||
| return | ||
|
|
||
| try: | ||
| logger.info(f"Stopping idb_companion for {self.udid}") | ||
|
|
||
| self.companion_process.terminate() | ||
|
|
||
| try: | ||
| await asyncio.wait_for(asyncio.to_thread(self.companion_process.wait), timeout=5.0) | ||
| logger.info(f"idb_companion stopped gracefully for {self.udid}") | ||
| except TimeoutError: | ||
| logger.warning(f"Force killing idb_companion for {self.udid}") | ||
| self.companion_process.kill() | ||
| await asyncio.to_thread(self.companion_process.wait) | ||
|
|
||
| except Exception as e: | ||
| logger.error(f"Error stopping idb_companion: {e}") | ||
| finally: | ||
| self.companion_process = None | ||
|
|
||
| def __del__(self): | ||
| if self.companion_process is not None: | ||
| try: | ||
| self.companion_process.terminate() | ||
| self.companion_process.wait(timeout=2) | ||
| except Exception: | ||
| try: | ||
| self.companion_process.kill() | ||
| except Exception: | ||
| pass | ||
|
|
||
| async def __aenter__(self): | ||
| await self.init_companion() | ||
| return self | ||
|
|
||
| async def __aexit__(self, exc_type, exc_val, exc_tb): | ||
| await self.cleanup() | ||
| return False | ||
|
|
||
| @with_idb_client | ||
| async def tap(self, client: Client, x: int, y: int, duration: float | None = None) -> bool: | ||
| await client.tap(x=x, y=y, duration=duration) | ||
| return True | ||
|
|
||
| @with_idb_client | ||
| async def swipe( | ||
| self, client: Client, x_start: int, y_start: int, x_end: int, y_end: int, delta: int = 10 | ||
| ) -> bool: | ||
| await client.swipe(p_start=(x_start, y_start), p_end=(x_end, y_end), delta=delta) | ||
| return True | ||
|
|
||
| @with_idb_client | ||
| async def screenshot(self, client: Client, output_path: str | None = None) -> bytes | None: | ||
| """ | ||
| Take a screenshot and return raw image data. | ||
|
|
||
| Returns: | ||
| Raw image data (PNG bytes not base64 encoded) | ||
| """ | ||
| screenshot_data = await client.screenshot() | ||
| if output_path: | ||
| with open(output_path, "wb") as f: | ||
| f.write(screenshot_data) | ||
| return screenshot_data | ||
|
|
||
| @with_idb_client | ||
| async def launch( | ||
| self, | ||
| client: Client, | ||
| bundle_id: str, | ||
| args: list[str] | None = None, | ||
| env: dict[str, str] | None = None, | ||
| ) -> bool: | ||
| await client.launch( | ||
| bundle_id=bundle_id, args=args or [], env=env or {}, foreground_if_running=True | ||
| ) | ||
| return True | ||
|
|
||
| @with_idb_client | ||
| async def terminate(self, client: Client, bundle_id: str) -> bool: | ||
| await client.terminate(bundle_id) | ||
| return True | ||
|
|
||
| @with_idb_client | ||
| async def install(self, client: Client, app_path: str) -> list[InstalledArtifact] | None: | ||
| bundle_path = Path(app_path) | ||
| artifacts = [] | ||
| with open(bundle_path, "rb") as f: | ||
| async for artifact in client.install(bundle=f): | ||
| artifacts.append(artifact) | ||
| return artifacts | ||
|
|
||
| @with_idb_client | ||
| async def uninstall(self, client: Client, bundle_id: str) -> bool: | ||
| await client.uninstall(bundle_id) | ||
| return True | ||
|
|
||
| @with_idb_client | ||
| async def list_apps(self, client: Client) -> list[InstalledAppInfo] | None: | ||
| apps = await client.list_apps() | ||
| return apps | ||
|
|
||
| @with_idb_client | ||
| async def text(self, client: Client, text: str) -> bool: | ||
| await client.text(text) | ||
| return True | ||
|
|
||
| @with_idb_client | ||
| async def key(self, client: Client, key_code: int) -> bool: | ||
| await client.key(key_code) | ||
| return True | ||
|
|
||
| @with_idb_client | ||
| async def button(self, client: Client, button_type: HIDButtonType) -> bool: | ||
| await client.button(button_type=button_type) | ||
| return True | ||
|
|
||
| @with_idb_client | ||
| async def clear_keychain(self, client: Client) -> bool: | ||
| await client.clear_keychain() | ||
| return True | ||
|
|
||
| @with_idb_client | ||
| async def open_url(self, client: Client, url: str) -> bool: | ||
| await client.open_url(url) | ||
| return True | ||
|
|
||
| @with_idb_client | ||
| async def describe_all(self, client: Client) -> dict[str, Any] | None: | ||
| accessibility_info = await client.accessibility_info(nested=True, point=None) | ||
| return json.loads(accessibility_info.json) | ||
|
|
||
| @with_idb_client | ||
| async def describe_point(self, client: Client, x: int, y: int) -> dict[str, Any] | None: | ||
| accessibility_info = await client.accessibility_info(point=(x, y), nested=True) | ||
| return json.loads(accessibility_info.json) | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.