Last active
February 1, 2026 08:01
-
-
Save cgm999/c918dd832d91ed8f8b2d8a572d95bd55 to your computer and use it in GitHub Desktop.
openconnect_sso - browser_playwright
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/bin/python3 | |
| # required configured python keyring (https://pypi.org/project/keyring/) | |
| # $ keyring get "openconnect-sso" your_email | |
| # $ keyring set "vpnadfscreds" "totp" # totpBase32Secret | |
| import argparse | |
| import os | |
| import sys | |
| import time | |
| import pprint | |
| from distutils.util import strtobool | |
| from typing import Callable, Tuple, Any, Literal, TypeVar, Optional, List | |
| from urllib.parse import urlparse | |
| import keyring | |
| import pyotp | |
| import asyncio | |
| from playwright.async_api import async_playwright, Page, Playwright, BrowserContext, Browser | |
| _T = TypeVar('_T') | |
| def require_non_null(value: Optional[_T], name: str) -> _T: | |
| if value is None: | |
| raise ValueError(f"{name} must not be null") | |
| return value | |
| class AppArgs: | |
| def __init__(self,server: str,cookie_name: str,email: str,headless: bool) -> None: | |
| """AppArgs.""" | |
| super().__init__() | |
| #args_parser = argparse.ArgumentParser() | |
| #args_parser.add_argument("--email", required=True, type=str) | |
| #args_parser.add_argument("--cookie_name", required=True, type=str) | |
| #args_parser.add_argument("--server", required=True, type=str) | |
| #args_parser.add_argument('--browser', required=False, default="firefox", choices=["chromium", "firefox", "webkit"]) | |
| #args_parser.add_argument('--headless', required=False,default=False, type=lambda x: bool(strtobool(str(x)))) | |
| #args = args_parser.parse_args() | |
| self.script_basename: str = os.path.splitext(os.path.basename(__file__))[0] | |
| self.server: str = server | |
| self.email: str = email | |
| self.cookie_name: str = cookie_name | |
| self.browser: Literal["chromium", "firefox", "webkit"] = "firefox" | |
| self.headless: bool = headless | |
| server_parsed = urlparse(self.server) | |
| self.server_domain: str = server_parsed.netloc | |
| class Credentials: | |
| def __init__(self,app_args: AppArgs) -> None: | |
| """Credentials.""" | |
| super().__init__() | |
| self.password: str = keyring.get_password("openconnect-sso", app_args.email) | |
| #self.password: str = "test123" | |
| # self.totp: pyotp.TOTP = pyotp.TOTP(require_non_null(keyring.get_password("vpnadfscreds", "totp"), "totp")) | |
| class Browser4: | |
| #async def __init__(self, app_args: AppArgs) -> None: | |
| def __init__(self, app_args: AppArgs): | |
| """Browser4.""" | |
| super().__init__() | |
| self.app_args = app_args | |
| self.default_timeout_seconds: float = 15 | |
| script_dir = os.path.dirname(os.path.realpath(__file__)) | |
| #self.state_file: str = os.path.join(script_dir, f"secrets/{app_args.script_basename}-state.json") | |
| #init_state_file = self.state_file if os.path.exists(self.state_file) else None | |
| async def async_init(self): | |
| self.playwright: Playwright = await async_playwright().start() | |
| self.browser: Browser = await self.playwright[self.app_args.browser].launch( | |
| #headless=self.app_args.headless, | |
| headless=False, | |
| firefox_user_prefs={'ui.key.menuAccessKeyFocuses': False}, | |
| #args=["--kiosk"], | |
| #args=["-window-size 800x600"], | |
| ) | |
| self.context: BrowserContext = await self.browser.new_context(locale="en-US")#, storage_state=init_state_file) | |
| self.context.set_default_timeout(self.default_timeout_seconds * 1000) | |
| self.context.set_default_navigation_timeout(self.default_timeout_seconds * 1000) | |
| self.page: Page = await self.context.new_page() | |
| return self | |
| def __await__(self): | |
| return self.async_init().__await__() | |
| #def store_session(self): | |
| # self.context.storage_state(path=self.state_file) | |
| async def close(self): | |
| await self.page.close() | |
| await self.context.close() | |
| await self.browser.close() | |
| await self.playwright.stop() | |
| async def wait_for_condition(call_id: str, | |
| predicate_supplier: Callable[[], bool], | |
| timeout_seconds: float, | |
| delay_seconds: float, | |
| initial_delay_seconds: float, | |
| page: Page) -> None: | |
| condition_ok = False | |
| attempts_cnt = 0 | |
| attempts_max = timeout_seconds / delay_seconds | |
| if initial_delay_seconds > 0: | |
| await page.wait_for_timeout(initial_delay_seconds * 1000) | |
| while not condition_ok and attempts_cnt <= attempts_max: | |
| try: | |
| condition_ok = predicate_supplier() | |
| if not condition_ok: | |
| print(f"retrying wait_for_condition {call_id}", file=sys.stderr) | |
| await page.wait_for_timeout(delay_seconds * 1000) | |
| attempts_cnt += 1 | |
| except Exception as e: | |
| exc_type, _, _ = sys.exc_info() | |
| # See main() for rationale. There is a similar code. | |
| if "playwright._impl._api_types.Error" in str(exc_type): | |
| print(f"retrying wait_for_condition {call_id}, met an exception on the way", file=sys.stderr) | |
| await page.wait_for_timeout(delay_seconds * 1000) | |
| attempts_cnt += 1 | |
| else: | |
| raise e | |
| if not condition_ok: | |
| raise TimeoutError(f"wait_for_condition {call_id} timed out") | |
| class FixedValuesBackOff: | |
| def __init__(self, values: List[int]) -> None: | |
| """FixedValuesBackOff.""" | |
| super().__init__() | |
| self.values: List[int] = values | |
| self.index: int = -1 | |
| def next_back_off(self) -> int: | |
| self.index += 1 | |
| return self.values[min(self.index, len(self.values) - 1)] | |
| def reset(self) -> None: | |
| self.index = -1 | |
| class TaskLoop: | |
| ALL_DONE_COOKIE_FOUND = 0 | |
| TASK_DONE = 1 | |
| EMPTY_MILE = 2 | |
| MFA_FAILED = 3 | |
| def __init__(self, app_args: AppArgs, credentials: Credentials, browser4: Browser4) -> None: | |
| """TaskLoop.""" | |
| super().__init__() | |
| self.app_args: AppArgs = app_args | |
| self.credentials: Credentials = credentials | |
| self.browser4: Browser4 = browser4 | |
| async def run_next(self) -> Tuple[int, Any]: | |
| page = self.browser4.page | |
| await page.wait_for_load_state() | |
| if self.app_args.server_domain in page.url \ | |
| and "No Assertion Received. Please sign in again." in await page.content(): | |
| print("VPN Sign-In Page, 'No Assertion Received' error", file=sys.stderr) | |
| await page.click("input[type='submit']") | |
| return TaskLoop.TASK_DONE, None | |
| if self.app_args.server_domain in page.url \ | |
| and "Pre Sign-In Notification" in await page.content(): | |
| print("VPN Sign-In Page, Pre Sign-In Notification", file=sys.stderr) | |
| await page.click("[name='sn-preauth-proceed']") | |
| return TaskLoop.TASK_DONE, None | |
| if self.app_args.server_domain in page.url and \ | |
| "Host Checker" in await page.content(): | |
| print("VPN Sign-In Page, Host Checker", file=sys.stderr) | |
| await page.click("#continue a") | |
| return TaskLoop.TASK_DONE, None | |
| # Email input is always "visible", but may be off-screen. | |
| if "login.microsoftonline.com" in page.url \ | |
| and await page.is_visible("[name='loginfmt']:not(.moveOffScreen)"): # msft sso | |
| #print("msft sso, Sign in to your account, Enter email", file=sys.stderr) | |
| await page.fill("[name='loginfmt']", self.app_args.email) | |
| await page.click("input[type='submit']") | |
| # do not return - continue with password | |
| if "login.microsoftonline.com" in page.url \ | |
| and await page.is_visible("[name='passwd']"): # msft sso | |
| #print("msft sso, Sign in to your account, Enter password", file=sys.stderr) | |
| await page.fill("[name='passwd']", self.credentials.password) | |
| await page.click("input[type='submit']") | |
| return TaskLoop.TASK_DONE, None | |
| if "login.microsoftonline.com" in page.url \ | |
| and await page.is_visible("[name='otc']"): # msft sso | |
| print("msft sso, Sign in to your account, Enter code", file=sys.stderr) | |
| await page.fill("[name='otc']", self.credentials.totp.now()) | |
| if page.is_enabled("[name='rememberMFA']"): # yes, may be disabled | |
| await page.check("[name='rememberMFA']") | |
| await page.click("input[type='submit']") | |
| # This case is too complicated to be solved by the built-in wait_for methods. | |
| # MFA input will disappear or an invalid code error will appear. | |
| wait_for_condition( | |
| "mfa_check_is_ok", | |
| lambda: (len(page.query_selector_all("[name='otc']")) == 0 | |
| or len(page.query_selector_all("#idSpan_SAOTCC_Error_OTC")) > 0), | |
| timeout_seconds=self.browser4.default_timeout_seconds, | |
| delay_seconds=0.3, | |
| # Initial delay may be useful for the invalid code error to be reset. (?) | |
| initial_delay_seconds=0.3, | |
| page=page | |
| ) | |
| mfa_ok = len(page.query_selector_all("[name='otc']")) == 0 | |
| if not mfa_ok: | |
| # len(page.query_selector_all("#idSpan_SAOTCC_Error_OTC")) > 0 condition is met | |
| # Sometimes the MFA code is not accepted. I don't know why, it just remains to report the state. | |
| return TaskLoop.MFA_FAILED, None | |
| return TaskLoop.TASK_DONE, None | |
| if "login.microsoftonline.com" in page.url \ | |
| and await page.is_visible("[name='DontShowAgain']"): # msft sso | |
| print("msft sso, Sign in to your account, Stay signed in?", file=sys.stderr) | |
| await page.check("[name='DontShowAgain']") | |
| await page.click("input[type='submit']") | |
| return TaskLoop.TASK_DONE, None | |
| if self.app_args.server_domain in page.url and \ | |
| "You have reached the maximum number of open user sessions" in await page.content(): | |
| print("VPN Sign-In Page, Confirmation Open Sessions", file=sys.stderr) | |
| await page.check("[name='postfixSID']") | |
| await page.click("[name='btnContinue']") | |
| return TaskLoop.TASK_DONE, None | |
| if self.app_args.server_domain in page.url and \ | |
| "There are already other user sessions in progress" in await page.content(): | |
| print("VPN Sign-In Page, Confirmation Open Sessions", file=sys.stderr) | |
| await page.click("[name='btnContinue']") | |
| return TaskLoop.TASK_DONE, None | |
| if self.app_args.server_domain in page.url: | |
| #print("Got here , have this cookies", file=sys.stderr) | |
| cookies = await self.browser4.context.cookies(self.app_args.server) | |
| #pprint.pprint(cookies) | |
| # sso-v2-token-cookie-name acSamlv2Token | |
| cookie = next(filter(lambda c: c["name"] == self.app_args.cookie_name, cookies), None) | |
| if cookie is not None: | |
| #print("reading the *needed* cookie", file=sys.stderr) | |
| payload = f"{cookie['value']}" | |
| return TaskLoop.ALL_DONE_COOKIE_FOUND, payload | |
| return TaskLoop.EMPTY_MILE, None | |
| async def get_token(proxy, auth_info, credentials, display_mode): | |
| app_args = AppArgs(auth_info.login_url,auth_info.token_cookie_name,credentials.username,display_mode) | |
| #print(f"{app_args.script_basename} started", file=sys.stderr) | |
| #print("initializing the script, reading credentials", file=sys.stderr) | |
| #credentials = Credentials(app_args) | |
| #print("initializing the script, opening the browser", file=sys.stderr) | |
| browser4 = await Browser4(app_args) | |
| start_time = time.time() | |
| max_time_seconds = 60 | |
| #empty_miles = FixedValuesBackOff([0.2, 0.2, 0.2, 0.4, 0.4, 0.6, 2, 6]) | |
| empty_miles = FixedValuesBackOff([2,2,2,2,2,2, 2, 6]) | |
| cookie_val = "" | |
| try: | |
| await browser4.page.goto(app_args.server, wait_until="networkidle") | |
| task_loop = TaskLoop(app_args, credentials, browser4) | |
| while True: | |
| try: | |
| elapsed_time = time.time() - start_time | |
| if elapsed_time > max_time_seconds: | |
| raise TimeoutError(f"{app_args.script_basename} timed out") | |
| #print(f"{app_args.script_basename} timed out", file=sys.stderr) ; break | |
| result, payload = await task_loop.run_next() | |
| if result == TaskLoop.TASK_DONE: | |
| empty_miles.reset() | |
| await browser4.page.wait_for_load_state("networkidle") | |
| await browser4.page.wait_for_timeout(timeout=100) | |
| elif result == TaskLoop.EMPTY_MILE: | |
| #delay_seconds = empty_miles.next_back_off() | |
| delay_seconds = 0.1 | |
| #print(f"drove an empty mile, pause for {delay_seconds:.1f}s", file=sys.stderr) | |
| await browser4.page.wait_for_load_state("networkidle") | |
| await browser4.page.wait_for_timeout(timeout=delay_seconds * 1000) | |
| elif result == TaskLoop.MFA_FAILED: | |
| print("retrying mfa step, but before that 6 seconds rest", file=sys.stderr) | |
| await browser4.page.wait_for_load_state("networkidle") | |
| await browser4.page.wait_for_timeout(6 * 1000) | |
| elif result == TaskLoop.ALL_DONE_COOKIE_FOUND: | |
| cookie_val = payload # cookie | |
| break | |
| else: | |
| raise AssertionError("unknown TaskLoop result") | |
| except Exception as e: | |
| exc_type, _, _ = sys.exc_info() | |
| # Example exceptions: | |
| # - Execution context was destroyed, most likely because of a navigation." | |
| # - Protocol error (Runtime.getProperties): Cannot find context with specified id" | |
| # These exceptions can occur when a page reload occurs during processing. | |
| if "playwright._impl._api_types.Error" in str(exc_type): | |
| delay_seconds = empty_miles.next_back_off() | |
| print(f"drove an exceptionally empty mile, pause for {delay_seconds:.1f}s", file=sys.stderr) | |
| await rowser4.page.wait_for_timeout(timeout=delay_seconds * 1000) | |
| else: | |
| raise e | |
| #print("storing the session", file=sys.stderr) | |
| #browser4.store_session() | |
| finally: | |
| await browser4.close() | |
| elapsed_time = time.time() - start_time | |
| print(f"browser_playwright.py done, elapsed time {elapsed_time:.1f}s", file=sys.stderr) | |
| return(cookie_val) | |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Playwright module(based on existing code found in github for vpn login ) instead of pyQt (which for me had issues) for openconnect SSO helper https://github.com/PrestonHager/openconnect-sso/
Put the gist file into openconnect_sso/browser_playwright.py
Replace also openconnect_sso/saml_authenticator.py with below content:
install which browser you plan to use, in my case is firefox, however you can see in the script it can be changed
bash$ playwright install firefoxfor python 3.14 , we need to create the async event loop: