Last active
October 3, 2025 05:00
-
-
Save mrjones-plip/f028724062d23c9c0c855827edbed9ac to your computer and use it in GitHub Desktop.
Make API call against collection of CHT instances, save results to individual JSON files
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from os import walk | |
| import os | |
| import subprocess | |
| from datetime import date | |
| from dateutil.relativedelta import relativedelta | |
| def get_files(path): | |
| f = [] | |
| for (dirpath, dirnames, filenames) in walk(mypath): | |
| f.extend(filenames) | |
| return filenames | |
| def get_json_from_file(path): | |
| instance = "\n" + path.replace("./user-devices/", "").replace(".user-devices.json","") | |
| bytes = os.path.getsize(path) | |
| if bytes < 50: | |
| return "" | |
| six_months = date.today() + relativedelta(months=-6) | |
| jq_has_apk_is_chrome = f"jq -r '.[] | select(.apk|length> 0) | select(.browser.name == \"Chrome\") " | |
| jq_new_than_6mo = f"{jq_has_apk_is_chrome} | select(.date > \"{six_months}\") " | |
| jq_get_user_date_version = f"{jq_new_than_6mo} | \"\(.user) \(.date) \(.browser.version)\"'" | |
| sort_cut_combo = " sort -u -k1,2r | sort -u -k1,1 | cut -f3 -d' '|cut -f1 -d'.'|sort|uniq -c|sort -rn" | |
| command = f"{jq_get_user_date_version} {path} | {sort_cut_combo} | wc -l" | |
| # exit(command) | |
| total_chrome_versions, err = run_command(command) | |
| if int(total_chrome_versions) == 0: | |
| # return f"{instance},0,No APK Users Found,," | |
| return "" | |
| sort_cut_combo_without_count = " sort -u -k1,2r | sort -u -k1,1 | cut -f3 -d' '|cut -f1 -d'.'|sort|uniq |sort -rn" | |
| command = f"{jq_get_user_date_version} {path} | {sort_cut_combo_without_count} | tail -n1" | |
| # exit(command) | |
| lowest_chrome_version, err = run_command(command) | |
| total_users_sort = " sort -u -k1,2r | sort -u -k1,1 | cut -f3 -d' '|cut -f1 -d'.'|wc -l" | |
| command = f"{jq_get_user_date_version} {path} | {total_users_sort} | tail -n1" | |
| # exit(command) | |
| total_users, err = run_command(command) | |
| total_users_below_sort = "sort -u -k1,2r | sort -u -k1,1| cut -f3 -d' '|cut -f1 -d'.'|awk '$1<107'|wc -l" | |
| command = f"{jq_get_user_date_version} {path} | {total_users_below_sort} | tail -n1" | |
| # exit(command) | |
| total_users_below, err = run_command(command) | |
| return (f"{instance}," | |
| f"{total_chrome_versions.decode().strip()}," | |
| f"{lowest_chrome_version.decode().strip()}," | |
| f"{int(total_users.decode().strip())}," | |
| f"{total_users_below.decode().strip()}") | |
| def run_command(command): | |
| proc = subprocess.Popen( | |
| command, | |
| shell=True, | |
| executable="/bin/bash", | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE | |
| ) | |
| return proc.communicate() | |
| def output_final(rows): | |
| command = f"echo \"{rows}\" | sort -rn -k 3 " | |
| result, err = run_command(command) | |
| print("Instance,Total Chrome Vers,Lowest Chrome Ver,Uniq Users in JSON,Users Less than 107") | |
| print(result.decode()) | |
| mypath = "./user-devices" | |
| result = "" | |
| for file in get_files(mypath): | |
| file_path = f"{mypath}/{file}" | |
| result = get_json_from_file(file_path) + str(result) | |
| output_final(result) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import subprocess | |
| import requests | |
| import json | |
| import os | |
| def get_1p_password(domain): | |
| # Vaults | |
| vaults = [ | |
| "foo", # CouchDB Instances | |
| "quuuuux", # Dev,Test CouchDB Instances | |
| "smaaaaang", # Clones - Couchdb | |
| ] | |
| entry_names = [ | |
| domain, | |
| f"couchdb: {domain}", | |
| f"couchdb:{domain}", | |
| f"couchdb - {domain}" | |
| ] | |
| for vault in vaults: | |
| for entry in entry_names: | |
| try: | |
| pw_result = subprocess.run([ | |
| "op", "item", "get", entry, | |
| "--vault", vault, | |
| "--field", "password", | |
| "--reveal" | |
| ], capture_output=True, text=True, timeout=10) | |
| user_result = subprocess.run([ | |
| "op", "item", "get", entry, | |
| "--vault", vault, | |
| "--field", "username" | |
| ], capture_output=True, text=True, timeout=10) | |
| password = pw_result.stdout.strip() if pw_result.returncode == 0 else None | |
| username = user_result.stdout.strip() if user_result.returncode == 0 else None | |
| if password: | |
| if not username: | |
| username = "medic" | |
| return username, password | |
| except Exception as e: | |
| print(f"WARN: get_1p_password() error {e}") | |
| continue | |
| return None, None | |
| def site_reachable(site): | |
| try: | |
| resp = requests.get(site, timeout=5) | |
| if resp.status_code < 200 or resp.status_code >= 400: | |
| return False, resp.status_code | |
| except Exception as e: | |
| print(f"FAIL: {site} not reachable ({e})") | |
| return False, 000 | |
| return True, resp.status_code | |
| def run_command(command): | |
| proc = subprocess.Popen( | |
| command, | |
| shell=True, | |
| executable="/bin/bash", | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE | |
| ) | |
| return proc.communicate() | |
| def get_version(site): | |
| try: | |
| full_url = f"https://{site}/api/v1/monitoring" | |
| command = f"curl -qs '{full_url}'" | |
| version_json, err = run_command(command) | |
| membership = json.loads(version_json) | |
| except Exception as e: | |
| print(f"FAIL: {site} not reachable ({e})") | |
| return False | |
| if membership['version']['app']: | |
| return membership['version']['app'] | |
| else: | |
| return False | |
| def get_urls(file): | |
| urls = [] | |
| with open(file, "r") as url_file: | |
| urls_temp = url_file.readlines() | |
| for site in urls_temp: | |
| site = site.strip() | |
| if site: | |
| urls.append(site) | |
| return urls | |
| def file_exists(path): | |
| try: | |
| os.path.getsize(path) | |
| return True | |
| except Exception as e: | |
| return False | |
| def process_site(site): | |
| device_file = f"./user-devices/{site}.user-devices.json" | |
| if file_exists(device_file): | |
| bytes = os.path.getsize(device_file) | |
| if bytes < 50: | |
| print(f"WARN: {site} cached {bytes}b (less than 50b)") | |
| warn = "(WARNING less than 50 bytes)" | |
| else: | |
| print(f"SUCCESS: {site} cached {bytes}b") | |
| return True | |
| reachable, http_response = site_reachable(f"https://{site}") | |
| if not reachable: | |
| print(f"FAIL: {site} could not be reached. Response {http_response}") | |
| return False | |
| version = get_version(site) | |
| if not version: | |
| print(f"FAIL: {site} version not found") | |
| return False | |
| username, password = get_1p_password(site) | |
| if not password: | |
| print(f"FAIL: {site} on {version} could not get password from 1Password") | |
| return False | |
| auth = f"{username}:{password}" | |
| full_url = f"https://{site}/api/v2/export/user-devices" | |
| try: | |
| command = f"curl -sq -u '{auth}' '{full_url}' > {device_file}" | |
| subprocess.run(command, shell=True, executable="/bin/bash") | |
| bytes = os.path.getsize(device_file) | |
| except Exception as e: | |
| print(f"FAIL: {site} on {version} couldn't reach API: ({e})") | |
| return False | |
| if bytes > 50: | |
| print(f"SUCCESS: {site} on {version} wrote {bytes}b") | |
| return True | |
| else: | |
| print(f"WARN: {site} on {version} wrote {bytes}b (less than 50b)") | |
| return False | |
| urls = get_urls("urls.txt") | |
| print(f"Starting with {len(urls)} sites!") | |
| for site in urls: | |
| process_site(site) | |
| print("Done!") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment