Created
January 20, 2026 01:41
-
-
Save opsec-ee/9f6635cd3dffec8b4eea53c9d5e4507a to your computer and use it in GitHub Desktop.
Mobile MCP
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #Euman - Mobile MCP: | |
| ``` | |
| from dataclasses import dataclass | |
| from enum import Enum | |
| from typing import Optional | |
| import hashlib | |
| import time | |
| # ═══════════════════════════════════════════════════════════════════ | |
| # CLASSIFICATION | |
| # ═══════════════════════════════════════════════════════════════════ | |
| class AppClassification(Enum): | |
| PUBLIC = 0 # Calculator, weather | |
| INTERNAL = 1 # Notes, calendar | |
| SENSITIVE = 2 # Email, messages | |
| RESTRICTED = 3 # Banking, authenticator | |
| APP_CLASSIFICATIONS = { | |
| "com.android.calculator": AppClassification.PUBLIC, | |
| "com.android.calendar": AppClassification.INTERNAL, | |
| "com.google.android.gm": AppClassification.SENSITIVE, # Gmail | |
| "com.chase.mobile": AppClassification.RESTRICTED, | |
| "com.google.android.apps.authenticator2": AppClassification.RESTRICTED, | |
| # Default: RESTRICTED (safe default) | |
| } | |
| # ═══════════════════════════════════════════════════════════════════ | |
| # CAPABILITY | |
| # ═══════════════════════════════════════════════════════════════════ | |
| @dataclass | |
| class MobileCapability: | |
| # What apps can be accessed | |
| allowed_apps: set[str] | |
| max_classification: AppClassification | |
| # What actions are permitted | |
| can_screenshot: bool = True | |
| can_tap: bool = True | |
| can_type: bool = False # Dangerous - default off | |
| can_swipe: bool = True | |
| # Egress control | |
| can_return_screenshots: bool = True | |
| redact_sensitive_regions: bool = True | |
| # Rate limiting | |
| actions_per_minute: int = 30 | |
| screenshots_per_minute: int = 10 | |
| # Quotas | |
| max_screenshots_per_session: int = 100 | |
| max_actions_per_session: int = 500 | |
| # Audit | |
| audit_all_actions: bool = True | |
| # ═══════════════════════════════════════════════════════════════════ | |
| # MOBILE MCP SERVER (SECURED) | |
| # ═══════════════════════════════════════════════════════════════════ | |
| class SecureMobileMCP: | |
| def __init__(self, capability: MobileCapability): | |
| self.cap = capability | |
| self.session_screenshots = 0 | |
| self.session_actions = 0 | |
| self.last_minute_actions = [] | |
| self.last_minute_screenshots = [] | |
| self.audit_log = [] | |
| def _get_current_app(self) -> str: | |
| """Get package name of foreground app""" | |
| # adb shell dumpsys activity | grep mCurrentFocus | |
| pass | |
| def _get_app_classification(self, package: str) -> AppClassification: | |
| return APP_CLASSIFICATIONS.get(package, AppClassification.RESTRICTED) | |
| def _check_rate_limit(self, action_type: str) -> bool: | |
| now = time.time() | |
| minute_ago = now - 60 | |
| if action_type == "screenshot": | |
| self.last_minute_screenshots = [ | |
| t for t in self.last_minute_screenshots if t > minute_ago | |
| ] | |
| if len(self.last_minute_screenshots) >= self.cap.screenshots_per_minute: | |
| return False | |
| self.last_minute_screenshots.append(now) | |
| else: | |
| self.last_minute_actions = [ | |
| t for t in self.last_minute_actions if t > minute_ago | |
| ] | |
| if len(self.last_minute_actions) >= self.cap.actions_per_minute: | |
| return False | |
| self.last_minute_actions.append(now) | |
| return True | |
| def _audit(self, action: str, result: str, details: dict): | |
| if self.cap.audit_all_actions: | |
| self.audit_log.append({ | |
| "timestamp": time.time(), | |
| "action": action, | |
| "result": result, | |
| "app": self._get_current_app(), | |
| "details": details, | |
| }) | |
| def _redact_screenshot(self, img_bytes: bytes) -> bytes: | |
| """Redact sensitive regions (password fields, etc.)""" | |
| if not self.cap.redact_sensitive_regions: | |
| return img_bytes | |
| # Use OCR to find sensitive fields | |
| # Draw black boxes over password inputs, etc. | |
| # This is where real implementation goes | |
| return img_bytes | |
| # ─────────────────────────────────────────────────────────────── | |
| # GATED TOOLS | |
| # ─────────────────────────────────────────────────────────────── | |
| def screenshot(self) -> dict: | |
| """Take screenshot with all Euman gates""" | |
| # Gate 1: Permission check | |
| if not self.cap.can_screenshot: | |
| self._audit("screenshot", "DENIED", {"reason": "capability"}) | |
| return {"error": "Screenshot not permitted by capability"} | |
| # Gate 2: App classification check | |
| current_app = self._get_current_app() | |
| app_class = self._get_app_classification(current_app) | |
| if app_class.value > self.cap.max_classification.value: | |
| self._audit("screenshot", "DENIED", { | |
| "reason": "classification", | |
| "app": current_app, | |
| "app_class": app_class.name, | |
| "max_class": self.cap.max_classification.name, | |
| }) | |
| return {"error": f"Cannot screenshot {app_class.name} apps"} | |
| # Gate 3: App whitelist check | |
| if current_app not in self.cap.allowed_apps and "*" not in self.cap.allowed_apps: | |
| self._audit("screenshot", "DENIED", { | |
| "reason": "scope", | |
| "app": current_app, | |
| }) | |
| return {"error": f"App {current_app} not in capability scope"} | |
| # Gate 4: Rate limit | |
| if not self._check_rate_limit("screenshot"): | |
| self._audit("screenshot", "DENIED", {"reason": "rate_limit"}) | |
| return {"error": "Screenshot rate limit exceeded"} | |
| # Gate 5: Quota check | |
| if self.session_screenshots >= self.cap.max_screenshots_per_session: | |
| self._audit("screenshot", "DENIED", {"reason": "quota"}) | |
| return {"error": "Screenshot quota exceeded"} | |
| # Gate 6: Egress check | |
| if not self.cap.can_return_screenshots: | |
| # Can take screenshot for internal use but not return it | |
| self._audit("screenshot", "DENIED", {"reason": "egress"}) | |
| return {"error": "Screenshot egress not permitted"} | |
| # ───────────────────────────────────────────────────────────── | |
| # ALL GATES PASSED | |
| # ───────────────────────────────────────────────────────────── | |
| img_bytes = self._capture_screen() | |
| # Gate 7: Redaction | |
| img_bytes = self._redact_screenshot(img_bytes) | |
| self.session_screenshots += 1 | |
| self._audit("screenshot", "ALLOWED", { | |
| "app": current_app, | |
| "size": len(img_bytes), | |
| }) | |
| return { | |
| "image": base64_encode(img_bytes), | |
| "app": current_app, | |
| "classification": app_class.name, | |
| } | |
| def tap(self, x: int, y: int) -> dict: | |
| """Tap with all Euman gates""" | |
| # Gate 1: Permission | |
| if not self.cap.can_tap: | |
| self._audit("tap", "DENIED", {"reason": "capability"}) | |
| return {"error": "Tap not permitted"} | |
| # Gate 2: App classification | |
| current_app = self._get_current_app() | |
| app_class = self._get_app_classification(current_app) | |
| if app_class.value > self.cap.max_classification.value: | |
| self._audit("tap", "DENIED", { | |
| "reason": "classification", | |
| "app": current_app, | |
| }) | |
| return {"error": f"Cannot interact with {app_class.name} apps"} | |
| # Gate 3: Scope | |
| if current_app not in self.cap.allowed_apps and "*" not in self.cap.allowed_apps: | |
| self._audit("tap", "DENIED", {"reason": "scope"}) | |
| return {"error": f"App {current_app} not in scope"} | |
| # Gate 4: Rate limit | |
| if not self._check_rate_limit("action"): | |
| self._audit("tap", "DENIED", {"reason": "rate_limit"}) | |
| return {"error": "Action rate limit exceeded"} | |
| # Gate 5: Quota | |
| if self.session_actions >= self.cap.max_actions_per_session: | |
| self._audit("tap", "DENIED", {"reason": "quota"}) | |
| return {"error": "Action quota exceeded"} | |
| # ALL GATES PASSED | |
| self._execute_tap(x, y) | |
| self.session_actions += 1 | |
| self._audit("tap", "ALLOWED", {"x": x, "y": y, "app": current_app}) | |
| return {"success": True} | |
| def type_text(self, text: str) -> dict: | |
| """Type text with EXTRA gates (most dangerous action)""" | |
| # Gate 1: Permission (default OFF) | |
| if not self.cap.can_type: | |
| self._audit("type_text", "DENIED", {"reason": "capability"}) | |
| return {"error": "Text input not permitted by capability"} | |
| # Gate 2: Classification (stricter for typing) | |
| current_app = self._get_current_app() | |
| app_class = self._get_app_classification(current_app) | |
| # Never allow typing into RESTRICTED apps | |
| if app_class == AppClassification.RESTRICTED: | |
| self._audit("type_text", "DENIED", { | |
| "reason": "classification", | |
| "app": current_app, | |
| }) | |
| return {"error": "Cannot type into restricted apps"} | |
| # Gate 3: Content check - no passwords | |
| if self._looks_like_password(text): | |
| self._audit("type_text", "DENIED", { | |
| "reason": "content", | |
| "text_hash": hashlib.sha256(text.encode()).hexdigest()[:16], | |
| }) | |
| return {"error": "Cannot type password-like content"} | |
| # ... other gates ... | |
| self._execute_type(text) | |
| self._audit("type_text", "ALLOWED", { | |
| "length": len(text), | |
| "app": current_app, | |
| }) | |
| return {"success": True} | |
| def _looks_like_password(self, text: str) -> bool: | |
| """Heuristic: is this probably a password?""" | |
| # Check if typing into password field (need UI state) | |
| # Check if text looks like password (entropy, special chars) | |
| # Check if field label contains "password" | |
| return False # Implement properly | |
| ``` | |
| ``` | |
| #Capability Profiles: | |
| # Safe demo capability | |
| DEMO_CAPABILITY = MobileCapability( | |
| allowed_apps={"com.android.calculator", "com.android.settings"}, | |
| max_classification=AppClassification.PUBLIC, | |
| can_screenshot=True, | |
| can_tap=True, | |
| can_type=False, # No typing | |
| can_swipe=True, | |
| screenshots_per_minute=5, | |
| actions_per_minute=20, | |
| ) | |
| # Automation capability (more trust) | |
| AUTOMATION_CAPABILITY = MobileCapability( | |
| allowed_apps={"*"}, # All apps | |
| max_classification=AppClassification.INTERNAL, | |
| can_screenshot=True, | |
| can_tap=True, | |
| can_type=True, # Can type | |
| can_swipe=True, | |
| redact_sensitive_regions=True, # But redact sensitive stuff | |
| screenshots_per_minute=30, | |
| actions_per_minute=60, | |
| ) | |
| # NEVER expose this | |
| UNRESTRICTED_CAPABILITY = MobileCapability( | |
| allowed_apps={"*"}, | |
| max_classification=AppClassification.RESTRICTED, | |
| can_screenshot=True, | |
| can_tap=True, | |
| can_type=True, | |
| redact_sensitive_regions=False, # DANGEROUS | |
| can_return_screenshots=True, | |
| ) | |
| ``` | |
| ### The Pattern | |
| Mobile MCP: | |
| ───────────────────────────── | |
| screenshot() → all screens | |
| tap(x,y) → anywhere | |
| type_text() → any field | |
| no gates | |
| screenshot() → allowed apps only | |
| tap(x,y) → allowed apps only | |
| type_text() → non-restricted only | |
| + classification | |
| + rate limits | |
| + quotas | |
| + redaction | |
| + audit | |
| ``` | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment