Skip to content

Instantly share code, notes, and snippets.

@onderozcan
Last active January 23, 2026 20:59
Show Gist options
  • Select an option

  • Save onderozcan/3ca0096f7b7e4cc3623a259816a3ed47 to your computer and use it in GitHub Desktop.

Select an option

Save onderozcan/3ca0096f7b7e4cc3623a259816a3ed47 to your computer and use it in GitHub Desktop.
import os
import requests
import json
import logging
import threading
import time
from concurrent.futures import ThreadPoolExecutor
logger = logging.getLogger(__name__)
class APIClient:
def __init__(self, base_url):
self.base_url = base_url.rstrip('/')
self.token = None
self.user_info = {}
self._sync_stop_event = None
self._sync_thread = None
self._bg_pool = ThreadPoolExecutor(max_workers=2)
self._http = requests.Session()
self._token_lock = threading.Lock() # Thread-safe token/session access
def save_session(self, filepath):
"""Save current session (token + user_info) to disk (thread-safe)."""
with self._token_lock:
if not self.token:
return
session_data = {
'token': self.token,
'user_info': self.user_info.copy()
}
try:
tmp_path = f"{filepath}.tmp"
with open(tmp_path, 'w') as f:
json.dump(session_data, f)
f.flush()
os.fsync(f.fileno())
os.replace(tmp_path, filepath)
except Exception as e:
print(f"Failed to sav (thread-safe)."""
try:
with open(filepath, 'r') as f:
data = json.load(f)
# Thread-safe token update
with self._token_lock:
self.token = data.get('token')
with open(filepath, 'r') as f:
data = json.load(f)
self.token = data.get('token')
self.user_info = data.get('user_info', {})
return True
except Exception:
return False
def clear_session(self, filepath):
"""Clear session from disk."""
try:
import os
if os.path.exists(filepath):
os.remove(filepath)
except Exception:
pass
def login(self, username, password):
url = f"{self.base_url}/api/login/"
try:
resp = self._http.post(url, json={'username': username, 'password': password}, timeout=10)
resp.raise_for_status()
data = resp.json()
if data.get('success'):
# Thread-safe token update
with self._token_lock:
self.token = data['token']
self.user_info = {
'username': data.get('username'),
'organization': data.get('organization'),
'role': data.get('role')
}
return True, None
else:
return False, data.get('error', 'Login failed')
except requests.exceptions.HTTPError as e:
if e.response.status_code == 401:
return False, "Email or password not matched"
return False, "Connection error. Please try again."
except Exception as e:
return False, "Connection error. Please try again."
def _headers(self):
"""Thread-safe headers generation."""
headers = {'Content-Type': 'application/json'}
with self._token_lock:
if self.token:
headers['Authorization'] = f'Bearer {self.token}'
return headers
def _strip_local_fields(self, profile):
blacklist = {
'profile_id',
'synced',
'last_synced_at',
'server_updated_at',
'sync_timestamp',
'server_id',
'deleted_at'
}
clean = {}
for k, v in profile.items():
if k in blacklist:
continue
if isinstance(v, dict):
clean[k] = v.copy()
elif isinstance(v, list):
clean[k] = [i.copy() if isinstance(i, dict) else i for i in v]
else:
clean[k] = v
return clean
def list_profiles(self):
url = f"{self.base_url}/api/profiles/"
try:
resp = self._http.get(url, headers=self._headers(), timeout=10)
resp.raise_for_status()
data = resp.json()
if data.get('success'):
return data.get('profiles', []), None
else:
return [], data.get('error', 'Failed to list profiles')
except Exception as e:
return [], str(e)
def get_tasks(self):
url = f"{self.base_url}/api/tasks/"
try:
resp = self._http.get(url, headers=self._headers(), timeout=10)
resp.raise_for_status()
data = resp.json()
if data.get('success'):
tasks = data.get('tasks', [])
return tasks, None
else:
error = data.get('error', 'Failed to list tasks')
return [], error
except Exception as e:
return [], str(e)
def start_task(self, task_id):
url = f"{self.base_url}/api/tasks/start/"
try:
resp = self._http.post(url, json={'task_id': task_id}, headers=self._headers(), timeout=10)
resp.raise_for_status()
data = resp.json()
if data.get('success'):
return True, None
else:
return False, data.get('error', 'Failed to start task')
except Exception as e:
return False, str(e)
def complete_task(self, task_id):
url = f"{self.base_url}/api/tasks/complete/"
try:
resp = self._http.post(url, json={'task_id': task_id}, headers=self._headers(), timeout=10)
resp.raise_for_status()
data = resp.json()
if data.get('success'):
return True, None
else:
return False, data.get('error', 'Failed to complete task')
except Exception as e:
return False, str(e)
def checkout_profile(self, profile_id):
url = f"{self.base_url}/api/checkout/"
try:
resp = self._http.post(url, json={'profile_id': profile_id}, headers=self._headers(), timeout=15)
resp.raise_for_status()
data = resp.json()
if data.get('success'):
return data, None
else:
return None, data.get('error', 'Checkout failed')
except Exception as e:
return None, str(e)
def checkin_profile(self, profile_id, cookies_json):
url = f"{self.base_url}/api/checkin/"
try:
resp = self._http.post(url, json={'profile_id': profile_id, 'cookies_json': cookies_json}, headers=self._headers(), timeout=15)
resp.raise_for_status()
data = resp.json()
if data.get('success'):
return True, None
else:
return False, data.get('error', 'Checkin failed')
except Exception as e:
return False, str(e)
def heartbeat(self, profile_id):
url = f"{self.base_url}/api/heartbeat/"
try:
resp = self._http.post(url, json={'profile_id': profile_id}, headers=self._headers(), timeout=5)
return resp.status_code == 200
except:
return False
def autosave(self, profile_id, cookies_json):
url = f"{self.base_url}/api/autosave/"
try:
resp = self._http.post(url, json={'profile_id': profile_id, 'cookies_json': cookies_json}, headers=self._headers(), timeout=10)
return resp.status_code == 200
except:
return False
def create_profile(self, profile_data):
url = f"{self.base_url}/api/profiles/create/"
try:
resp = self._http.post(url, json=profile_data, headers=self._headers(), timeout=15)
if not resp.ok:
try:
resp_json = resp.json()
error_msg = resp_json.get('error', resp.reason)
# If server returns traceback, include it for debugging
if 'traceback' in resp_json:
error_msg += f"\n\nServer Traceback:\n{resp_json['traceback']}"
except Exception:
error_msg = resp.text or resp.reason
return None, f"Server Error ({resp.status_code}): {error_msg}"
data = resp.json()
if data.get('success'):
return data, None
else:
error_msg = data.get('error', 'Failed to create profile')
if 'traceback' in data:
error_msg += f"\n\nServer Traceback:\n{data['traceback']}"
return None, error_msg
except Exception as e:
return None, str(e)
def update_profile(self, profile_id, profile_data):
url = f"{self.base_url}/api/profiles/{profile_id}/update/"
print(f"\n{'='*60}")
print(f"UPDATE PROFILE - ID: {profile_id}")
print(f"Status: {profile_data.get('status')}")
print(f"{'='*60}\n")
try:
resp = self._http.post(url, json=profile_data, headers=self._headers(), timeout=15)
print(f"Response: {resp.status_code} - {resp.text[:200]}")
if not resp.ok:
try:
resp_json = resp.json()
error_msg = resp_json.get('error', resp.reason)
if 'traceback' in resp_json:
error_msg += f"\n\nServer Traceback:\n{resp_json['traceback']}"
except Exception:
error_msg = resp.text or resp.reason
return None, f"Server Error ({resp.status_code}): {error_msg}"
data = resp.json()
if data.get('success'):
return data, None
else:
error_msg = data.get('error', 'Failed to update profile')
if 'traceback' in data:
error_msg += f"\n\nServer Traceback:\n{data['traceback']}"
return None, error_msg
except Exception as e:
print(f"EXCEPTION: {e}")
return None, str(e)
def delete_profile(self, profile_id):
url = f"{self.base_url}/api/profiles/{profile_id}/delete/"
try:
resp = self._http.delete(url, headers=self._headers(), timeout=15)
if not resp.ok:
try:
error_msg = resp.json().get('error', resp.reason)
except:
error_msg = resp.text or resp.reason
return False, f"Server Error ({resp.status_code}): {error_msg}"
data = resp.json()
if data.get('success'):
return True, None
else:
return False, data.get('error', 'Failed to delete profile')
except Exception as e:
return False, str(e)
# ==================== OFFLINE-FIRST METHODS ====================
def start_sync_worker(self, local_db):
"""
Start background sync worker with retry logic.
Processes sync queue with exponential backoff.
"""
def _worker():
while True:
if self._sync_stop_event and self._sync_stop_event.is_set():
logger.info("πŸ›‘ Sync worker stopping")
return
try:
queue = local_db.get_sync_queue()
if not queue:
time.sleep(5)
continue
for item in queue:
# Check if retry delay passed
retry_delay = local_db.get_retry_delay(item['retry_count'])
if item['last_attempt'] and (time.time() - item['last_attempt']) < retry_delay:
continue # Too soon to retry
local_db.mark_sync_in_progress(item['id'])
# Process sync item
action = item['action']
profile_id = item['profile_id']
data = item['data']
logger.info(f"πŸ”„ Processing sync: {action} {profile_id} (attempt {item['retry_count'] + 1})")
try:
if action == 'create':
payload = self._strip_local_fields(data)
resp, err = self.create_profile(payload)
if resp:
local_db.mark_sync_completed(item['id'], resp)
else:
local_db.mark_sync_failed(item['id'], err or "Unknown error")
elif action == 'update':
server_id = data.get('server_id')
if not server_id:
local_db.mark_sync_failed(item['id'], "Missing server_id")
continue
payload = self._strip_local_fields(data)
resp, err = self.update_profile(server_id, payload)
if resp:
local_db.mark_sync_completed(item['id'], resp)
else:
local_db.mark_sync_failed(item['id'], err or "Unknown error")
elif action == 'delete':
server_id = data.get('server_id')
if not server_id:
local_db.mark_sync_failed(item['id'], "Missing server_id")
continue
success, err = self.delete_profile(server_id)
if success:
local_db.mark_sync_completed(item['id'], {'profile_id': server_id})
else:
local_db.mark_sync_failed(item['id'], err or "Unknown error")
except Exception as e:
logger.error(f"Sync error: {e}")
local_db.mark_sync_failed(item['id'], str(e))
# Rate limiting between syncs
time.sleep(0.5)
except Exception as e:
logger.error(f"Sync worker error: {e}")
# Check queue every 5 seconds
time.sleep(5)
self._sync_stop_event = threading.Event()
self._sync_thread = threading.Thread(target=_worker, daemon=True)
self._sync_thread.start()
logger.info("πŸ”„ Sync worker started")
def stop_sync_worker(self):
"""Stop background sync worker."""
if self._sync_stop_event:
self._sync_stop_event.set()
def create_profile_async(self, profile_data, callback=None):
"""
Create profile on server asynchronously (non-blocking).
Returns immediately. Callback called with (success, result, error).
"""
def _sync():
try:
resp, err = self.create_profile(profile_data)
if callback:
callback(resp is not None, resp, err)
except Exception as e:
logger.error(f"Background sync failed: {e}")
if callback:
callback(False, None, str(e))
self._bg_pool.submit(_sync)
logger.info("πŸ”„ Profile sync queued (background)")
def update_profile_async(self, profile_id, profile_data, callback=None):
"""
Update profile on server asynchronously (non-blocking).
Returns immediately. Callback called with (success, result, error).
"""
def _sync():
try:
resp, err = self.update_profile(profile_id, profile_data)
if callback:
callback(resp is not None, resp, err)
except Exception as e:
logger.error(f"Background sync failed: {e}")
if callback:
callback(False, None, str(e))
self._bg_pool.submit(_sync)
logger.info(f"πŸ”„ Profile update synced (background): {profile_id}")
def list_profiles_with_fallback(self, fallback_profiles):
"""
Fetch profiles from server with timeout.
On failure, return fallback_profiles.
Args:
fallback_profiles: Local profiles to use if server unavailable
Returns:
(profiles, is_from_server)
"""
try:
profiles, err = self.list_profiles()
if err:
logger.warning(f"Server fetch failed: {err}. Using local cache.")
return fallback_profiles, False
return profiles, True
except Exception as e:
logger.warning(f"Server unreachable: {e}. Using local cache.")
return fallback_profiles, False
def check_profile_metadata(self, profile_id):
"""
Lightweight check: Get only profile metadata (timestamp, locked status).
Fast check without full checkout.
Returns:
(metadata_dict, error) or (None, error)
"""
url = f"{self.base_url}/api/profiles/{profile_id}/metadata/"
try:
resp = self._http.get(url, headers=self._headers(), timeout=3)
if resp.ok:
data = resp.json()
if data.get('success'):
return data.get('metadata'), None
return None, "Metadata fetch failed"
except Exception as e:
return None, str(e)
def checkout_profile_with_fallback(self, profile_id, fallback_data):
"""
Checkout profile from server with timeout.
On failure, return fallback_data (local cache).
Args:
profile_id: Profile ID
fallback_data: Local profile data to use if server unavailable
Returns:
(data, is_from_server)
"""
try:
data, err = self.checkout_profile(profile_id)
if err:
logger.warning(f"Server checkout failed: {err}. Using local cache.")
return fallback_data, False
return data, True
except Exception as e:
logger.warning(f"Server unreachable: {e}. Using local cache.")
return fallback_data, False
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment