Last active
January 23, 2026 20:59
-
-
Save onderozcan/3ca0096f7b7e4cc3623a259816a3ed47 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import requests | |
| import json | |
| import logging | |
| import threading | |
| import time | |
| from concurrent.futures import ThreadPoolExecutor | |
| logger = logging.getLogger(__name__) | |
| class APIClient: | |
| def __init__(self, base_url): | |
| self.base_url = base_url.rstrip('/') | |
| self.token = None | |
| self.user_info = {} | |
| self._sync_stop_event = None | |
| self._sync_thread = None | |
| self._bg_pool = ThreadPoolExecutor(max_workers=2) | |
| self._http = requests.Session() | |
| self._token_lock = threading.Lock() # Thread-safe token/session access | |
| def save_session(self, filepath): | |
| """Save current session (token + user_info) to disk (thread-safe).""" | |
| with self._token_lock: | |
| if not self.token: | |
| return | |
| session_data = { | |
| 'token': self.token, | |
| 'user_info': self.user_info.copy() | |
| } | |
| try: | |
| tmp_path = f"{filepath}.tmp" | |
| with open(tmp_path, 'w') as f: | |
| json.dump(session_data, f) | |
| f.flush() | |
| os.fsync(f.fileno()) | |
| os.replace(tmp_path, filepath) | |
| except Exception as e: | |
| print(f"Failed to sav (thread-safe).""" | |
| try: | |
| with open(filepath, 'r') as f: | |
| data = json.load(f) | |
| # Thread-safe token update | |
| with self._token_lock: | |
| self.token = data.get('token') | |
| with open(filepath, 'r') as f: | |
| data = json.load(f) | |
| self.token = data.get('token') | |
| self.user_info = data.get('user_info', {}) | |
| return True | |
| except Exception: | |
| return False | |
| def clear_session(self, filepath): | |
| """Clear session from disk.""" | |
| try: | |
| import os | |
| if os.path.exists(filepath): | |
| os.remove(filepath) | |
| except Exception: | |
| pass | |
| def login(self, username, password): | |
| url = f"{self.base_url}/api/login/" | |
| try: | |
| resp = self._http.post(url, json={'username': username, 'password': password}, timeout=10) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| if data.get('success'): | |
| # Thread-safe token update | |
| with self._token_lock: | |
| self.token = data['token'] | |
| self.user_info = { | |
| 'username': data.get('username'), | |
| 'organization': data.get('organization'), | |
| 'role': data.get('role') | |
| } | |
| return True, None | |
| else: | |
| return False, data.get('error', 'Login failed') | |
| except requests.exceptions.HTTPError as e: | |
| if e.response.status_code == 401: | |
| return False, "Email or password not matched" | |
| return False, "Connection error. Please try again." | |
| except Exception as e: | |
| return False, "Connection error. Please try again." | |
| def _headers(self): | |
| """Thread-safe headers generation.""" | |
| headers = {'Content-Type': 'application/json'} | |
| with self._token_lock: | |
| if self.token: | |
| headers['Authorization'] = f'Bearer {self.token}' | |
| return headers | |
| def _strip_local_fields(self, profile): | |
| blacklist = { | |
| 'profile_id', | |
| 'synced', | |
| 'last_synced_at', | |
| 'server_updated_at', | |
| 'sync_timestamp', | |
| 'server_id', | |
| 'deleted_at' | |
| } | |
| clean = {} | |
| for k, v in profile.items(): | |
| if k in blacklist: | |
| continue | |
| if isinstance(v, dict): | |
| clean[k] = v.copy() | |
| elif isinstance(v, list): | |
| clean[k] = [i.copy() if isinstance(i, dict) else i for i in v] | |
| else: | |
| clean[k] = v | |
| return clean | |
| def list_profiles(self): | |
| url = f"{self.base_url}/api/profiles/" | |
| try: | |
| resp = self._http.get(url, headers=self._headers(), timeout=10) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| if data.get('success'): | |
| return data.get('profiles', []), None | |
| else: | |
| return [], data.get('error', 'Failed to list profiles') | |
| except Exception as e: | |
| return [], str(e) | |
| def get_tasks(self): | |
| url = f"{self.base_url}/api/tasks/" | |
| try: | |
| resp = self._http.get(url, headers=self._headers(), timeout=10) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| if data.get('success'): | |
| tasks = data.get('tasks', []) | |
| return tasks, None | |
| else: | |
| error = data.get('error', 'Failed to list tasks') | |
| return [], error | |
| except Exception as e: | |
| return [], str(e) | |
| def start_task(self, task_id): | |
| url = f"{self.base_url}/api/tasks/start/" | |
| try: | |
| resp = self._http.post(url, json={'task_id': task_id}, headers=self._headers(), timeout=10) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| if data.get('success'): | |
| return True, None | |
| else: | |
| return False, data.get('error', 'Failed to start task') | |
| except Exception as e: | |
| return False, str(e) | |
| def complete_task(self, task_id): | |
| url = f"{self.base_url}/api/tasks/complete/" | |
| try: | |
| resp = self._http.post(url, json={'task_id': task_id}, headers=self._headers(), timeout=10) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| if data.get('success'): | |
| return True, None | |
| else: | |
| return False, data.get('error', 'Failed to complete task') | |
| except Exception as e: | |
| return False, str(e) | |
| def checkout_profile(self, profile_id): | |
| url = f"{self.base_url}/api/checkout/" | |
| try: | |
| resp = self._http.post(url, json={'profile_id': profile_id}, headers=self._headers(), timeout=15) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| if data.get('success'): | |
| return data, None | |
| else: | |
| return None, data.get('error', 'Checkout failed') | |
| except Exception as e: | |
| return None, str(e) | |
| def checkin_profile(self, profile_id, cookies_json): | |
| url = f"{self.base_url}/api/checkin/" | |
| try: | |
| resp = self._http.post(url, json={'profile_id': profile_id, 'cookies_json': cookies_json}, headers=self._headers(), timeout=15) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| if data.get('success'): | |
| return True, None | |
| else: | |
| return False, data.get('error', 'Checkin failed') | |
| except Exception as e: | |
| return False, str(e) | |
| def heartbeat(self, profile_id): | |
| url = f"{self.base_url}/api/heartbeat/" | |
| try: | |
| resp = self._http.post(url, json={'profile_id': profile_id}, headers=self._headers(), timeout=5) | |
| return resp.status_code == 200 | |
| except: | |
| return False | |
| def autosave(self, profile_id, cookies_json): | |
| url = f"{self.base_url}/api/autosave/" | |
| try: | |
| resp = self._http.post(url, json={'profile_id': profile_id, 'cookies_json': cookies_json}, headers=self._headers(), timeout=10) | |
| return resp.status_code == 200 | |
| except: | |
| return False | |
| def create_profile(self, profile_data): | |
| url = f"{self.base_url}/api/profiles/create/" | |
| try: | |
| resp = self._http.post(url, json=profile_data, headers=self._headers(), timeout=15) | |
| if not resp.ok: | |
| try: | |
| resp_json = resp.json() | |
| error_msg = resp_json.get('error', resp.reason) | |
| # If server returns traceback, include it for debugging | |
| if 'traceback' in resp_json: | |
| error_msg += f"\n\nServer Traceback:\n{resp_json['traceback']}" | |
| except Exception: | |
| error_msg = resp.text or resp.reason | |
| return None, f"Server Error ({resp.status_code}): {error_msg}" | |
| data = resp.json() | |
| if data.get('success'): | |
| return data, None | |
| else: | |
| error_msg = data.get('error', 'Failed to create profile') | |
| if 'traceback' in data: | |
| error_msg += f"\n\nServer Traceback:\n{data['traceback']}" | |
| return None, error_msg | |
| except Exception as e: | |
| return None, str(e) | |
| def update_profile(self, profile_id, profile_data): | |
| url = f"{self.base_url}/api/profiles/{profile_id}/update/" | |
| print(f"\n{'='*60}") | |
| print(f"UPDATE PROFILE - ID: {profile_id}") | |
| print(f"Status: {profile_data.get('status')}") | |
| print(f"{'='*60}\n") | |
| try: | |
| resp = self._http.post(url, json=profile_data, headers=self._headers(), timeout=15) | |
| print(f"Response: {resp.status_code} - {resp.text[:200]}") | |
| if not resp.ok: | |
| try: | |
| resp_json = resp.json() | |
| error_msg = resp_json.get('error', resp.reason) | |
| if 'traceback' in resp_json: | |
| error_msg += f"\n\nServer Traceback:\n{resp_json['traceback']}" | |
| except Exception: | |
| error_msg = resp.text or resp.reason | |
| return None, f"Server Error ({resp.status_code}): {error_msg}" | |
| data = resp.json() | |
| if data.get('success'): | |
| return data, None | |
| else: | |
| error_msg = data.get('error', 'Failed to update profile') | |
| if 'traceback' in data: | |
| error_msg += f"\n\nServer Traceback:\n{data['traceback']}" | |
| return None, error_msg | |
| except Exception as e: | |
| print(f"EXCEPTION: {e}") | |
| return None, str(e) | |
| def delete_profile(self, profile_id): | |
| url = f"{self.base_url}/api/profiles/{profile_id}/delete/" | |
| try: | |
| resp = self._http.delete(url, headers=self._headers(), timeout=15) | |
| if not resp.ok: | |
| try: | |
| error_msg = resp.json().get('error', resp.reason) | |
| except: | |
| error_msg = resp.text or resp.reason | |
| return False, f"Server Error ({resp.status_code}): {error_msg}" | |
| data = resp.json() | |
| if data.get('success'): | |
| return True, None | |
| else: | |
| return False, data.get('error', 'Failed to delete profile') | |
| except Exception as e: | |
| return False, str(e) | |
| # ==================== OFFLINE-FIRST METHODS ==================== | |
| def start_sync_worker(self, local_db): | |
| """ | |
| Start background sync worker with retry logic. | |
| Processes sync queue with exponential backoff. | |
| """ | |
| def _worker(): | |
| while True: | |
| if self._sync_stop_event and self._sync_stop_event.is_set(): | |
| logger.info("π Sync worker stopping") | |
| return | |
| try: | |
| queue = local_db.get_sync_queue() | |
| if not queue: | |
| time.sleep(5) | |
| continue | |
| for item in queue: | |
| # Check if retry delay passed | |
| retry_delay = local_db.get_retry_delay(item['retry_count']) | |
| if item['last_attempt'] and (time.time() - item['last_attempt']) < retry_delay: | |
| continue # Too soon to retry | |
| local_db.mark_sync_in_progress(item['id']) | |
| # Process sync item | |
| action = item['action'] | |
| profile_id = item['profile_id'] | |
| data = item['data'] | |
| logger.info(f"π Processing sync: {action} {profile_id} (attempt {item['retry_count'] + 1})") | |
| try: | |
| if action == 'create': | |
| payload = self._strip_local_fields(data) | |
| resp, err = self.create_profile(payload) | |
| if resp: | |
| local_db.mark_sync_completed(item['id'], resp) | |
| else: | |
| local_db.mark_sync_failed(item['id'], err or "Unknown error") | |
| elif action == 'update': | |
| server_id = data.get('server_id') | |
| if not server_id: | |
| local_db.mark_sync_failed(item['id'], "Missing server_id") | |
| continue | |
| payload = self._strip_local_fields(data) | |
| resp, err = self.update_profile(server_id, payload) | |
| if resp: | |
| local_db.mark_sync_completed(item['id'], resp) | |
| else: | |
| local_db.mark_sync_failed(item['id'], err or "Unknown error") | |
| elif action == 'delete': | |
| server_id = data.get('server_id') | |
| if not server_id: | |
| local_db.mark_sync_failed(item['id'], "Missing server_id") | |
| continue | |
| success, err = self.delete_profile(server_id) | |
| if success: | |
| local_db.mark_sync_completed(item['id'], {'profile_id': server_id}) | |
| else: | |
| local_db.mark_sync_failed(item['id'], err or "Unknown error") | |
| except Exception as e: | |
| logger.error(f"Sync error: {e}") | |
| local_db.mark_sync_failed(item['id'], str(e)) | |
| # Rate limiting between syncs | |
| time.sleep(0.5) | |
| except Exception as e: | |
| logger.error(f"Sync worker error: {e}") | |
| # Check queue every 5 seconds | |
| time.sleep(5) | |
| self._sync_stop_event = threading.Event() | |
| self._sync_thread = threading.Thread(target=_worker, daemon=True) | |
| self._sync_thread.start() | |
| logger.info("π Sync worker started") | |
| def stop_sync_worker(self): | |
| """Stop background sync worker.""" | |
| if self._sync_stop_event: | |
| self._sync_stop_event.set() | |
| def create_profile_async(self, profile_data, callback=None): | |
| """ | |
| Create profile on server asynchronously (non-blocking). | |
| Returns immediately. Callback called with (success, result, error). | |
| """ | |
| def _sync(): | |
| try: | |
| resp, err = self.create_profile(profile_data) | |
| if callback: | |
| callback(resp is not None, resp, err) | |
| except Exception as e: | |
| logger.error(f"Background sync failed: {e}") | |
| if callback: | |
| callback(False, None, str(e)) | |
| self._bg_pool.submit(_sync) | |
| logger.info("π Profile sync queued (background)") | |
| def update_profile_async(self, profile_id, profile_data, callback=None): | |
| """ | |
| Update profile on server asynchronously (non-blocking). | |
| Returns immediately. Callback called with (success, result, error). | |
| """ | |
| def _sync(): | |
| try: | |
| resp, err = self.update_profile(profile_id, profile_data) | |
| if callback: | |
| callback(resp is not None, resp, err) | |
| except Exception as e: | |
| logger.error(f"Background sync failed: {e}") | |
| if callback: | |
| callback(False, None, str(e)) | |
| self._bg_pool.submit(_sync) | |
| logger.info(f"π Profile update synced (background): {profile_id}") | |
| def list_profiles_with_fallback(self, fallback_profiles): | |
| """ | |
| Fetch profiles from server with timeout. | |
| On failure, return fallback_profiles. | |
| Args: | |
| fallback_profiles: Local profiles to use if server unavailable | |
| Returns: | |
| (profiles, is_from_server) | |
| """ | |
| try: | |
| profiles, err = self.list_profiles() | |
| if err: | |
| logger.warning(f"Server fetch failed: {err}. Using local cache.") | |
| return fallback_profiles, False | |
| return profiles, True | |
| except Exception as e: | |
| logger.warning(f"Server unreachable: {e}. Using local cache.") | |
| return fallback_profiles, False | |
| def check_profile_metadata(self, profile_id): | |
| """ | |
| Lightweight check: Get only profile metadata (timestamp, locked status). | |
| Fast check without full checkout. | |
| Returns: | |
| (metadata_dict, error) or (None, error) | |
| """ | |
| url = f"{self.base_url}/api/profiles/{profile_id}/metadata/" | |
| try: | |
| resp = self._http.get(url, headers=self._headers(), timeout=3) | |
| if resp.ok: | |
| data = resp.json() | |
| if data.get('success'): | |
| return data.get('metadata'), None | |
| return None, "Metadata fetch failed" | |
| except Exception as e: | |
| return None, str(e) | |
| def checkout_profile_with_fallback(self, profile_id, fallback_data): | |
| """ | |
| Checkout profile from server with timeout. | |
| On failure, return fallback_data (local cache). | |
| Args: | |
| profile_id: Profile ID | |
| fallback_data: Local profile data to use if server unavailable | |
| Returns: | |
| (data, is_from_server) | |
| """ | |
| try: | |
| data, err = self.checkout_profile(profile_id) | |
| if err: | |
| logger.warning(f"Server checkout failed: {err}. Using local cache.") | |
| return fallback_data, False | |
| return data, True | |
| except Exception as e: | |
| logger.warning(f"Server unreachable: {e}. Using local cache.") | |
| return fallback_data, False |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment