-
-
Save shaarkys/cee47895f1a11c52a7b65e1380b01821 to your computer and use it in GitHub Desktop.
This script logs into your De'Longhi account, retrieves an authentication token, and then fetches information about your registered coffee machines or other De'Longhi comfort devices.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import base64 | |
| from datetime import datetime | |
| import urllib.parse | |
| import json | |
| import tempfile | |
| import os | |
| import sys | |
| import getpass | |
| import argparse | |
| import requests | |
| try: | |
| from cremalink import Client as CremaClient | |
| except Exception: | |
| CremaClient = None | |
| TOKEN_FILE_DEFAULT = "delonghi_token.json" | |
| def _ensure_ok_json(resp: requests.Response, context: str) -> dict: | |
| try: | |
| resp.raise_for_status() | |
| except requests.HTTPError as e: | |
| body = resp.text | |
| if body and len(body) > 2000: | |
| body = body[:2000] + "…(truncated)" | |
| raise RuntimeError(f"[HTTP] {context}: {e} | body={body}") from e | |
| try: | |
| return resp.json() | |
| except Exception as e: | |
| body = resp.text | |
| if body and len(body) > 2000: | |
| body = body[:2000] + "…(truncated)" | |
| raise RuntimeError(f"[JSON] {context}: Failed to parse JSON | body={body}") from e | |
| def _gigya_raise_if_error(data: dict, context: str): | |
| error_code = data.get("errorCode") | |
| if error_code is None: | |
| return | |
| if isinstance(error_code, int) and error_code != 0: | |
| msg = data.get("errorMessage") or data.get("errorDetails") or "Unknown Gigya error" | |
| status_reason = data.get("statusReason") | |
| extra = [] | |
| if status_reason: | |
| extra.append(f"statusReason={status_reason}") | |
| if "errorDetails" in data and data.get("errorDetails") and data.get("errorDetails") != msg: | |
| extra.append(f"errorDetails={data.get('errorDetails')}") | |
| extra_s = (" | " + ", ".join(extra)) if extra else "" | |
| raise RuntimeError(f"[GIGYA] {context}: errorCode={error_code} message={msg}{extra_s}") | |
| def get_ayla_tokens_via_gigya(email: str, password: str) -> dict: | |
| print("Authenticating...") | |
| SDK_BUILD = 16650 | |
| API_KEY = "3_e5qn7USZK-QtsIso1wCelqUKAK_IVEsYshRIssQ-X-k55haiZXmKWDHDRul2e5Y2" | |
| CLIENT_ID = "1S8q1WJEs-emOB43Z0-66WnL" | |
| CLIENT_SECRET = "lmnceiD0B-4KPNN5ZS6WuWU70j9V5BCuSlz2OPsvHkyLryhMkJkPvKsivfTq3RfNYj8GpCELtOBvhaDIzKcBtg" | |
| AUTHORIZATION_HEADER = ( | |
| "Basic " + base64.b64encode(f"{CLIENT_ID}:{CLIENT_SECRET}".encode()).decode() | |
| ) | |
| APP_ID = "DeLonghiComfort2-mw-id" | |
| APP_SECRET = "DeLonghiComfort2-Yg4miiqiNcf0Or-EhJwRh7ACfBY" | |
| BROWSER_USER_AGENT = "DeLonghiComfort/5.1.1" | |
| def get_query_param(url, param): | |
| return urllib.parse.parse_qs(urllib.parse.urlparse(url).query).get(param, [None])[0] | |
| auth_response = requests.get( | |
| f"https://fidm.eu1.gigya.com/oidc/op/v1.0/{API_KEY}/authorize", | |
| headers={"User-Agent": BROWSER_USER_AGENT}, | |
| params={ | |
| "client_id": CLIENT_ID, | |
| "response_type": "code", | |
| "redirect_uri": "https://google.it", | |
| "scope": "openid email profile UID comfort en alexa", | |
| "nonce": str(int(datetime.now().timestamp())), | |
| }, | |
| allow_redirects=False, | |
| timeout=30, | |
| ) | |
| if "Location" not in auth_response.headers: | |
| raise RuntimeError( | |
| f"[OIDC] authorize: missing Location header. HTTP {auth_response.status_code} body={auth_response.text[:500]}" | |
| ) | |
| context = get_query_param(auth_response.headers["Location"], "context") | |
| if not context: | |
| raise RuntimeError("[OIDC] authorize: missing 'context' query parameter in redirect Location.") | |
| gigya_ids_resp = requests.get( | |
| "https://socialize.eu1.gigya.com/socialize.getIDs", | |
| headers={"User-Agent": BROWSER_USER_AGENT}, | |
| params={ | |
| "APIKey": API_KEY, | |
| "includeTicket": True, | |
| "pageURL": "https://aylaopenid.delonghigroup.com/", | |
| "sdk": "js_latest", | |
| "sdkBuild": SDK_BUILD, | |
| "format": "json", | |
| }, | |
| timeout=30, | |
| ) | |
| gigya_session = _ensure_ok_json(gigya_ids_resp, "socialize.getIDs") | |
| _gigya_raise_if_error(gigya_session, "socialize.getIDs") | |
| ucid = gigya_session.get("ucid") | |
| gmid = gigya_session.get("gmid") | |
| gmid_ticket = gigya_session.get("gmidTicket") | |
| if not (ucid and gmid and gmid_ticket): | |
| raise KeyError(f"Missing expected fields from socialize.getIDs. keys={list(gigya_session.keys())}") | |
| login_resp = requests.post( | |
| "https://accounts.eu1.gigya.com/accounts.login", | |
| headers={"User-Agent": BROWSER_USER_AGENT}, | |
| data={ | |
| "loginID": email, | |
| "password": password, | |
| "sessionExpiration": 7884009, | |
| "targetEnv": "jssdk", | |
| "include": "profile,data,emails,subscriptions,preferences", | |
| "includeUserInfo": True, | |
| "loginMode": "standard", | |
| "APIKey": API_KEY, | |
| "source": "showScreenSet", | |
| "sdk": "js_latest", | |
| "authMode": "cookie", | |
| "pageURL": "https://aylaopenid.delonghigroup.com/", | |
| "gmid": gmid, | |
| "ucid": ucid, | |
| "sdkBuild": SDK_BUILD, | |
| "format": "json", | |
| }, | |
| timeout=30, | |
| ) | |
| login_data = _ensure_ok_json(login_resp, "accounts.login") | |
| _gigya_raise_if_error(login_data, "accounts.login") | |
| session_info = login_data.get("sessionInfo") | |
| if not session_info: | |
| raise KeyError(f"accounts.login: sessionInfo missing. keys={list(login_data.keys())}") | |
| login_token = session_info.get("login_token") | |
| if not login_token: | |
| raise KeyError("accounts.login: sessionInfo.login_token missing.") | |
| userinfo_resp = requests.post( | |
| "https://socialize.eu1.gigya.com/socialize.getUserInfo", | |
| headers={"User-Agent": BROWSER_USER_AGENT}, | |
| data={ | |
| "enabledProviders": "*", | |
| "APIKey": API_KEY, | |
| "sdk": "js_latest", | |
| "login_token": login_token, | |
| "authMode": "cookie", | |
| "pageURL": "https://aylaopenid.delonghigroup.com/", | |
| "gmid": gmid, | |
| "ucid": ucid, | |
| "sdkBuild": SDK_BUILD, | |
| "format": "json", | |
| }, | |
| timeout=30, | |
| ) | |
| user_info = _ensure_ok_json(userinfo_resp, "socialize.getUserInfo") | |
| _gigya_raise_if_error(user_info, "socialize.getUserInfo") | |
| user_uid = user_info.get("UID") | |
| user_uid_signature = user_info.get("UIDSignature") | |
| user_signature_timestamp = user_info.get("signatureTimestamp") | |
| if not (user_uid and user_uid_signature and user_signature_timestamp): | |
| raise KeyError(f"socialize.getUserInfo missing expected fields. keys={list(user_info.keys())}") | |
| consent_resp = requests.get( | |
| "https://aylaopenid.delonghigroup.com/OIDCConsentPage.php", | |
| headers={"User-Agent": BROWSER_USER_AGENT}, | |
| params={ | |
| "context": context, | |
| "clientID": CLIENT_ID, | |
| "scope": "openid+email+profile+UID+comfort+en+alexa", | |
| "UID": user_uid, | |
| "UIDSignature": user_uid_signature, | |
| "signatureTimestamp": user_signature_timestamp, | |
| }, | |
| timeout=30, | |
| ) | |
| consent_resp.raise_for_status() | |
| consent_text = consent_resp.text | |
| if "const consentObj2Sig = '" not in consent_text: | |
| raise RuntimeError("[OIDC] Consent page did not contain expected consent signature marker.") | |
| signature = consent_text.split("const consentObj2Sig = '")[1].split("';")[0] | |
| cont_resp = requests.get( | |
| f"https://fidm.eu1.gigya.com/oidc/op/v1.0/{API_KEY}/authorize/continue", | |
| headers={"User-Agent": BROWSER_USER_AGENT}, | |
| params={ | |
| "context": context, | |
| "login_token": login_token, | |
| "consent": json.dumps( | |
| { | |
| "scope": "openid email profile UID comfort en alexa", | |
| "clientID": CLIENT_ID, | |
| "context": context, | |
| "UID": user_uid, | |
| "consent": True, | |
| }, | |
| separators=(",", ":"), | |
| ), | |
| "sig": signature, | |
| "gmidTicket": gmid_ticket, | |
| }, | |
| allow_redirects=False, | |
| timeout=30, | |
| ) | |
| if "Location" not in cont_resp.headers: | |
| raise RuntimeError( | |
| f"[OIDC] authorize/continue: missing Location header. HTTP {cont_resp.status_code} body={cont_resp.text[:500]}" | |
| ) | |
| code = get_query_param(cont_resp.headers["Location"], "code") | |
| if not code: | |
| raise RuntimeError("[OIDC] authorize/continue: missing 'code' in redirect Location.") | |
| token_resp = requests.post( | |
| f"https://fidm.eu1.gigya.com/oidc/op/v1.0/{API_KEY}/token", | |
| headers={ | |
| "User-Agent": BROWSER_USER_AGENT, | |
| "Authorization": AUTHORIZATION_HEADER, | |
| "Content-Type": "application/x-www-form-urlencoded", | |
| }, | |
| data={ | |
| "code": code, | |
| "grant_type": "authorization_code", | |
| "redirect_uri": "https://google.it", | |
| }, | |
| timeout=30, | |
| ) | |
| idp_token_data = _ensure_ok_json(token_resp, "oidc/token") | |
| idp_token = idp_token_data.get("access_token") | |
| if not idp_token: | |
| raise KeyError(f"oidc/token: access_token missing. keys={list(idp_token_data.keys())}") | |
| ayla_signin_resp = requests.post( | |
| "https://user-field-eu.aylanetworks.com/api/v1/token_sign_in", | |
| headers={"User-Agent": BROWSER_USER_AGENT}, | |
| data={"app_id": APP_ID, "app_secret": APP_SECRET, "token": idp_token}, | |
| timeout=30, | |
| ) | |
| ayla_signin = _ensure_ok_json(ayla_signin_resp, "ayla/token_sign_in") | |
| refresh_token = ayla_signin.get("refresh_token") | |
| if not refresh_token: | |
| raise KeyError(f"ayla/token_sign_in: refresh_token missing. keys={list(ayla_signin.keys())}") | |
| access_token = ayla_signin.get("access_token") | |
| expires_in = ayla_signin.get("expires_in") | |
| print(f"Refresh token: {refresh_token}") | |
| print("Authentication successful!") | |
| return {"refresh_token": refresh_token, "access_token": access_token, "expires_in": expires_in} | |
| def ayla_refresh_access_token(refresh_token: str) -> dict: | |
| resp = requests.post( | |
| "https://user-field-eu.aylanetworks.com/users/refresh_token.json", | |
| json={"user": {"refresh_token": refresh_token}}, | |
| timeout=30, | |
| ) | |
| data = _ensure_ok_json(resp, "ayla/users/refresh_token.json") | |
| if "access_token" in data: | |
| return data | |
| if "user" in data and isinstance(data["user"], dict) and "access_token" in data["user"]: | |
| return data["user"] | |
| raise KeyError(f"Unexpected refresh_token response shape. keys={list(data.keys())}") | |
| def list_all_ayla_devices(access_token: str) -> list: | |
| headers = { | |
| "Authorization": f"auth_token {access_token}", | |
| "User-Agent": "DeLonghiComfort/5.1.1", | |
| "Accept": "application/json", | |
| } | |
| resp = requests.get( | |
| "https://ads-field-eu.aylanetworks.com/apiv1/devices.json", | |
| headers=headers, | |
| timeout=30, | |
| ) | |
| devices = _ensure_ok_json(resp, "ads/devices.json") | |
| if not isinstance(devices, list): | |
| raise TypeError(f"Unexpected devices response type: {type(devices)}") | |
| return devices | |
| def save_refresh_token(token_file: str, refresh_token: str): | |
| data = {"refresh_token": refresh_token} | |
| with open(token_file, "w", encoding="utf-8") as f: | |
| json.dump(data, f) | |
| print(f"[OK] Saved refresh token to: {token_file}") | |
| def load_refresh_token(token_file: str) -> str: | |
| with open(token_file, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| rt = data.get("refresh_token") | |
| if not rt: | |
| raise KeyError(f"refresh_token missing in {token_file}") | |
| return rt | |
| def main(): | |
| import argparse | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--token-file", default=TOKEN_FILE_DEFAULT, help="Where to store refresh_token") | |
| parser.add_argument("--use-refresh-only", action="store_true", help="Do not ask for email/password, use token-file") | |
| args = parser.parse_args() | |
| print("De'Longhi / Ayla Device Info Retriever (ALL devices)") | |
| print("===================================================\n") | |
| temp_token_path = None | |
| try: | |
| if args.use_refresh_only: | |
| refresh_token = load_refresh_token(args.token_file) | |
| print("[OK] Loaded refresh token from file ✅") | |
| refreshed = ayla_refresh_access_token(refresh_token) | |
| access_token = refreshed.get("access_token") | |
| refresh_token = refreshed.get("refresh_token", refresh_token) | |
| else: | |
| email = input("Email: ") | |
| if sys.stdin.isatty(): | |
| password = getpass.getpass("Password: ") | |
| else: | |
| print("Warning: Running in a non-interactive environment. Password will be visible.") | |
| password = input("Password: ") | |
| tokens = get_ayla_tokens_via_gigya(email, password) | |
| refresh_token = tokens["refresh_token"] | |
| save_refresh_token(args.token_file, refresh_token) | |
| access_token = tokens.get("access_token") | |
| if not access_token: | |
| refreshed = ayla_refresh_access_token(refresh_token) | |
| access_token = refreshed.get("access_token") | |
| refresh_token = refreshed.get("refresh_token", refresh_token) | |
| print("\nFetching ALL device information from Ayla (ADS)…") | |
| devices = list_all_ayla_devices(access_token) | |
| print(f"\nFound {len(devices)} devices.\n") | |
| crema_client = None | |
| if CremaClient is not None: | |
| try: | |
| with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json", encoding="utf-8") as f: | |
| temp_token_path = f.name | |
| json.dump({"refresh_token": refresh_token}, f) | |
| crema_client = CremaClient(token_path=temp_token_path) | |
| except Exception as e: | |
| print(f"[WARN] cremalink init failed, continuing without LAN enrichment: {e}") | |
| crema_client = None | |
| for item in devices: | |
| dev = item.get("device", {}) | |
| dsn = dev.get("dsn") | |
| print("---------------------------------") | |
| print(f"Product name: {dev.get('product_name')}") | |
| print(f"Model: {dev.get('model')}") | |
| print(f"OEM Model: {dev.get('oem_model')}") | |
| print(f"DSN: {dsn}") | |
| print(f"Connected: {dev.get('connected')}") | |
| print(f"IP (cloud): {dev.get('ip')}") | |
| print(f"LAN enabled: {dev.get('lan_enabled')}") | |
| if crema_client and dsn: | |
| try: | |
| device = crema_client.get_device(dsn) | |
| print(f"LAN Key: {getattr(device, 'lan_key', None)}") | |
| print(f"LAN IP: {getattr(device, 'ip', None)}") | |
| print(f"Crema model: {getattr(device, 'model', None)}") | |
| except Exception as e: | |
| print(f"[INFO] cremalink details not available for {dsn}: {e}") | |
| print("---------------------------------") | |
| except Exception as e: | |
| print(f"\n[ERROR] {e}") | |
| finally: | |
| if temp_token_path and os.path.exists(temp_token_path): | |
| try: | |
| os.remove(temp_token_path) | |
| except Exception: | |
| pass | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment