Skip to content

Instantly share code, notes, and snippets.

@cgm999
Last active February 1, 2026 08:01
Show Gist options
  • Select an option

  • Save cgm999/c918dd832d91ed8f8b2d8a572d95bd55 to your computer and use it in GitHub Desktop.

Select an option

Save cgm999/c918dd832d91ed8f8b2d8a572d95bd55 to your computer and use it in GitHub Desktop.
openconnect_sso - browser_playwright
#!/bin/python3
# required configured python keyring (https://pypi.org/project/keyring/)
# $ keyring get "openconnect-sso" your_email
# $ keyring set "vpnadfscreds" "totp" # totpBase32Secret
import argparse
import os
import sys
import time
import pprint
from distutils.util import strtobool
from typing import Callable, Tuple, Any, Literal, TypeVar, Optional, List
from urllib.parse import urlparse
import keyring
import pyotp
import asyncio
from playwright.async_api import async_playwright, Page, Playwright, BrowserContext, Browser
_T = TypeVar('_T')
def require_non_null(value: Optional[_T], name: str) -> _T:
if value is None:
raise ValueError(f"{name} must not be null")
return value
class AppArgs:
def __init__(self,server: str,cookie_name: str,email: str,headless: bool) -> None:
"""AppArgs."""
super().__init__()
#args_parser = argparse.ArgumentParser()
#args_parser.add_argument("--email", required=True, type=str)
#args_parser.add_argument("--cookie_name", required=True, type=str)
#args_parser.add_argument("--server", required=True, type=str)
#args_parser.add_argument('--browser', required=False, default="firefox", choices=["chromium", "firefox", "webkit"])
#args_parser.add_argument('--headless', required=False,default=False, type=lambda x: bool(strtobool(str(x))))
#args = args_parser.parse_args()
self.script_basename: str = os.path.splitext(os.path.basename(__file__))[0]
self.server: str = server
self.email: str = email
self.cookie_name: str = cookie_name
self.browser: Literal["chromium", "firefox", "webkit"] = "firefox"
self.headless: bool = headless
server_parsed = urlparse(self.server)
self.server_domain: str = server_parsed.netloc
class Credentials:
def __init__(self,app_args: AppArgs) -> None:
"""Credentials."""
super().__init__()
self.password: str = keyring.get_password("openconnect-sso", app_args.email)
#self.password: str = "test123"
# self.totp: pyotp.TOTP = pyotp.TOTP(require_non_null(keyring.get_password("vpnadfscreds", "totp"), "totp"))
class Browser4:
#async def __init__(self, app_args: AppArgs) -> None:
def __init__(self, app_args: AppArgs):
"""Browser4."""
super().__init__()
self.app_args = app_args
self.default_timeout_seconds: float = 15
script_dir = os.path.dirname(os.path.realpath(__file__))
#self.state_file: str = os.path.join(script_dir, f"secrets/{app_args.script_basename}-state.json")
#init_state_file = self.state_file if os.path.exists(self.state_file) else None
async def async_init(self):
self.playwright: Playwright = await async_playwright().start()
self.browser: Browser = await self.playwright[self.app_args.browser].launch(
#headless=self.app_args.headless,
headless=False,
firefox_user_prefs={'ui.key.menuAccessKeyFocuses': False},
#args=["--kiosk"],
#args=["-window-size 800x600"],
)
self.context: BrowserContext = await self.browser.new_context(locale="en-US")#, storage_state=init_state_file)
self.context.set_default_timeout(self.default_timeout_seconds * 1000)
self.context.set_default_navigation_timeout(self.default_timeout_seconds * 1000)
self.page: Page = await self.context.new_page()
return self
def __await__(self):
return self.async_init().__await__()
#def store_session(self):
# self.context.storage_state(path=self.state_file)
async def close(self):
await self.page.close()
await self.context.close()
await self.browser.close()
await self.playwright.stop()
async def wait_for_condition(call_id: str,
predicate_supplier: Callable[[], bool],
timeout_seconds: float,
delay_seconds: float,
initial_delay_seconds: float,
page: Page) -> None:
condition_ok = False
attempts_cnt = 0
attempts_max = timeout_seconds / delay_seconds
if initial_delay_seconds > 0:
await page.wait_for_timeout(initial_delay_seconds * 1000)
while not condition_ok and attempts_cnt <= attempts_max:
try:
condition_ok = predicate_supplier()
if not condition_ok:
print(f"retrying wait_for_condition {call_id}", file=sys.stderr)
await page.wait_for_timeout(delay_seconds * 1000)
attempts_cnt += 1
except Exception as e:
exc_type, _, _ = sys.exc_info()
# See main() for rationale. There is a similar code.
if "playwright._impl._api_types.Error" in str(exc_type):
print(f"retrying wait_for_condition {call_id}, met an exception on the way", file=sys.stderr)
await page.wait_for_timeout(delay_seconds * 1000)
attempts_cnt += 1
else:
raise e
if not condition_ok:
raise TimeoutError(f"wait_for_condition {call_id} timed out")
class FixedValuesBackOff:
def __init__(self, values: List[int]) -> None:
"""FixedValuesBackOff."""
super().__init__()
self.values: List[int] = values
self.index: int = -1
def next_back_off(self) -> int:
self.index += 1
return self.values[min(self.index, len(self.values) - 1)]
def reset(self) -> None:
self.index = -1
class TaskLoop:
ALL_DONE_COOKIE_FOUND = 0
TASK_DONE = 1
EMPTY_MILE = 2
MFA_FAILED = 3
def __init__(self, app_args: AppArgs, credentials: Credentials, browser4: Browser4) -> None:
"""TaskLoop."""
super().__init__()
self.app_args: AppArgs = app_args
self.credentials: Credentials = credentials
self.browser4: Browser4 = browser4
async def run_next(self) -> Tuple[int, Any]:
page = self.browser4.page
await page.wait_for_load_state()
if self.app_args.server_domain in page.url \
and "No Assertion Received. Please sign in again." in await page.content():
print("VPN Sign-In Page, 'No Assertion Received' error", file=sys.stderr)
await page.click("input[type='submit']")
return TaskLoop.TASK_DONE, None
if self.app_args.server_domain in page.url \
and "Pre Sign-In Notification" in await page.content():
print("VPN Sign-In Page, Pre Sign-In Notification", file=sys.stderr)
await page.click("[name='sn-preauth-proceed']")
return TaskLoop.TASK_DONE, None
if self.app_args.server_domain in page.url and \
"Host Checker" in await page.content():
print("VPN Sign-In Page, Host Checker", file=sys.stderr)
await page.click("#continue a")
return TaskLoop.TASK_DONE, None
# Email input is always "visible", but may be off-screen.
if "login.microsoftonline.com" in page.url \
and await page.is_visible("[name='loginfmt']:not(.moveOffScreen)"): # msft sso
#print("msft sso, Sign in to your account, Enter email", file=sys.stderr)
await page.fill("[name='loginfmt']", self.app_args.email)
await page.click("input[type='submit']")
# do not return - continue with password
if "login.microsoftonline.com" in page.url \
and await page.is_visible("[name='passwd']"): # msft sso
#print("msft sso, Sign in to your account, Enter password", file=sys.stderr)
await page.fill("[name='passwd']", self.credentials.password)
await page.click("input[type='submit']")
return TaskLoop.TASK_DONE, None
if "login.microsoftonline.com" in page.url \
and await page.is_visible("[name='otc']"): # msft sso
print("msft sso, Sign in to your account, Enter code", file=sys.stderr)
await page.fill("[name='otc']", self.credentials.totp.now())
if page.is_enabled("[name='rememberMFA']"): # yes, may be disabled
await page.check("[name='rememberMFA']")
await page.click("input[type='submit']")
# This case is too complicated to be solved by the built-in wait_for methods.
# MFA input will disappear or an invalid code error will appear.
wait_for_condition(
"mfa_check_is_ok",
lambda: (len(page.query_selector_all("[name='otc']")) == 0
or len(page.query_selector_all("#idSpan_SAOTCC_Error_OTC")) > 0),
timeout_seconds=self.browser4.default_timeout_seconds,
delay_seconds=0.3,
# Initial delay may be useful for the invalid code error to be reset. (?)
initial_delay_seconds=0.3,
page=page
)
mfa_ok = len(page.query_selector_all("[name='otc']")) == 0
if not mfa_ok:
# len(page.query_selector_all("#idSpan_SAOTCC_Error_OTC")) > 0 condition is met
# Sometimes the MFA code is not accepted. I don't know why, it just remains to report the state.
return TaskLoop.MFA_FAILED, None
return TaskLoop.TASK_DONE, None
if "login.microsoftonline.com" in page.url \
and await page.is_visible("[name='DontShowAgain']"): # msft sso
print("msft sso, Sign in to your account, Stay signed in?", file=sys.stderr)
await page.check("[name='DontShowAgain']")
await page.click("input[type='submit']")
return TaskLoop.TASK_DONE, None
if self.app_args.server_domain in page.url and \
"You have reached the maximum number of open user sessions" in await page.content():
print("VPN Sign-In Page, Confirmation Open Sessions", file=sys.stderr)
await page.check("[name='postfixSID']")
await page.click("[name='btnContinue']")
return TaskLoop.TASK_DONE, None
if self.app_args.server_domain in page.url and \
"There are already other user sessions in progress" in await page.content():
print("VPN Sign-In Page, Confirmation Open Sessions", file=sys.stderr)
await page.click("[name='btnContinue']")
return TaskLoop.TASK_DONE, None
if self.app_args.server_domain in page.url:
#print("Got here , have this cookies", file=sys.stderr)
cookies = await self.browser4.context.cookies(self.app_args.server)
#pprint.pprint(cookies)
# sso-v2-token-cookie-name acSamlv2Token
cookie = next(filter(lambda c: c["name"] == self.app_args.cookie_name, cookies), None)
if cookie is not None:
#print("reading the *needed* cookie", file=sys.stderr)
payload = f"{cookie['value']}"
return TaskLoop.ALL_DONE_COOKIE_FOUND, payload
return TaskLoop.EMPTY_MILE, None
async def get_token(proxy, auth_info, credentials, display_mode):
app_args = AppArgs(auth_info.login_url,auth_info.token_cookie_name,credentials.username,display_mode)
#print(f"{app_args.script_basename} started", file=sys.stderr)
#print("initializing the script, reading credentials", file=sys.stderr)
#credentials = Credentials(app_args)
#print("initializing the script, opening the browser", file=sys.stderr)
browser4 = await Browser4(app_args)
start_time = time.time()
max_time_seconds = 60
#empty_miles = FixedValuesBackOff([0.2, 0.2, 0.2, 0.4, 0.4, 0.6, 2, 6])
empty_miles = FixedValuesBackOff([2,2,2,2,2,2, 2, 6])
cookie_val = ""
try:
await browser4.page.goto(app_args.server, wait_until="networkidle")
task_loop = TaskLoop(app_args, credentials, browser4)
while True:
try:
elapsed_time = time.time() - start_time
if elapsed_time > max_time_seconds:
raise TimeoutError(f"{app_args.script_basename} timed out")
#print(f"{app_args.script_basename} timed out", file=sys.stderr) ; break
result, payload = await task_loop.run_next()
if result == TaskLoop.TASK_DONE:
empty_miles.reset()
await browser4.page.wait_for_load_state("networkidle")
await browser4.page.wait_for_timeout(timeout=100)
elif result == TaskLoop.EMPTY_MILE:
#delay_seconds = empty_miles.next_back_off()
delay_seconds = 0.1
#print(f"drove an empty mile, pause for {delay_seconds:.1f}s", file=sys.stderr)
await browser4.page.wait_for_load_state("networkidle")
await browser4.page.wait_for_timeout(timeout=delay_seconds * 1000)
elif result == TaskLoop.MFA_FAILED:
print("retrying mfa step, but before that 6 seconds rest", file=sys.stderr)
await browser4.page.wait_for_load_state("networkidle")
await browser4.page.wait_for_timeout(6 * 1000)
elif result == TaskLoop.ALL_DONE_COOKIE_FOUND:
cookie_val = payload # cookie
break
else:
raise AssertionError("unknown TaskLoop result")
except Exception as e:
exc_type, _, _ = sys.exc_info()
# Example exceptions:
# - Execution context was destroyed, most likely because of a navigation."
# - Protocol error (Runtime.getProperties): Cannot find context with specified id"
# These exceptions can occur when a page reload occurs during processing.
if "playwright._impl._api_types.Error" in str(exc_type):
delay_seconds = empty_miles.next_back_off()
print(f"drove an exceptionally empty mile, pause for {delay_seconds:.1f}s", file=sys.stderr)
await rowser4.page.wait_for_timeout(timeout=delay_seconds * 1000)
else:
raise e
#print("storing the session", file=sys.stderr)
#browser4.store_session()
finally:
await browser4.close()
elapsed_time = time.time() - start_time
print(f"browser_playwright.py done, elapsed time {elapsed_time:.1f}s", file=sys.stderr)
return(cookie_val)
@cgm999
Copy link
Author

cgm999 commented Feb 1, 2026

Playwright module(based on existing code found in github for vpn login ) instead of pyQt (which for me had issues) for openconnect SSO helper https://github.com/PrestonHager/openconnect-sso/

Put the gist file into openconnect_sso/browser_playwright.py

Replace also openconnect_sso/saml_authenticator.py with below content:

linux# cat /usr/lib/python3.14/site-packages/openconnect_sso/saml_authenticator.py
import structlog

from openconnect_sso.browser_playwright import get_token
log = structlog.get_logger()
import pprint
import sys
async def authenticate_in_browser(proxy, auth_info, credentials, display_mode):
    return await get_token(proxy, auth_info, credentials, display_mode)

install which browser you plan to use, in my case is firefox, however you can see in the script it can be changed
bash$ playwright install firefox

for python 3.14 , we need to create the async event loop:

linux# diff -u /usr/lib/python3.14/site-packages/openconnect_sso/app.py.bak /usr/lib/python3.14/site-packages/openconnect_sso/app.py
--- /usr/lib/python3.14/site-packages/openconnect_sso/app.py.bak        2026-01-12 09:08:29.000000000 +0200
+++ /usr/lib/python3.14/site-packages/openconnect_sso/app.py    2026-01-12 09:26:33.491158827 +0200
@@ -32,6 +32,7 @@
     try:
         if os.name == "nt":
             asyncio.set_event_loop(asyncio.ProactorEventLoop())
+        asyncio.set_event_loop(asyncio.new_event_loop())
         auth_response, selected_profile = asyncio.get_event_loop().run_until_complete(
             _run(args, cfg)
         )

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment