Skip to content

Instantly share code, notes, and snippets.

@mrjones-plip
Last active October 3, 2025 05:00
Show Gist options
  • Select an option

  • Save mrjones-plip/f028724062d23c9c0c855827edbed9ac to your computer and use it in GitHub Desktop.

Select an option

Save mrjones-plip/f028724062d23c9c0c855827edbed9ac to your computer and use it in GitHub Desktop.
Make API call against collection of CHT instances, save results to individual JSON files
from os import walk
import os
import subprocess
from datetime import date
from dateutil.relativedelta import relativedelta
def get_files(path):
f = []
for (dirpath, dirnames, filenames) in walk(mypath):
f.extend(filenames)
return filenames
def get_json_from_file(path):
instance = "\n" + path.replace("./user-devices/", "").replace(".user-devices.json","")
bytes = os.path.getsize(path)
if bytes < 50:
return ""
six_months = date.today() + relativedelta(months=-6)
jq_has_apk_is_chrome = f"jq -r '.[] | select(.apk|length> 0) | select(.browser.name == \"Chrome\") "
jq_new_than_6mo = f"{jq_has_apk_is_chrome} | select(.date > \"{six_months}\") "
jq_get_user_date_version = f"{jq_new_than_6mo} | \"\(.user) \(.date) \(.browser.version)\"'"
sort_cut_combo = " sort -u -k1,2r | sort -u -k1,1 | cut -f3 -d' '|cut -f1 -d'.'|sort|uniq -c|sort -rn"
command = f"{jq_get_user_date_version} {path} | {sort_cut_combo} | wc -l"
# exit(command)
total_chrome_versions, err = run_command(command)
if int(total_chrome_versions) == 0:
# return f"{instance},0,No APK Users Found,,"
return ""
sort_cut_combo_without_count = " sort -u -k1,2r | sort -u -k1,1 | cut -f3 -d' '|cut -f1 -d'.'|sort|uniq |sort -rn"
command = f"{jq_get_user_date_version} {path} | {sort_cut_combo_without_count} | tail -n1"
# exit(command)
lowest_chrome_version, err = run_command(command)
total_users_sort = " sort -u -k1,2r | sort -u -k1,1 | cut -f3 -d' '|cut -f1 -d'.'|wc -l"
command = f"{jq_get_user_date_version} {path} | {total_users_sort} | tail -n1"
# exit(command)
total_users, err = run_command(command)
total_users_below_sort = "sort -u -k1,2r | sort -u -k1,1| cut -f3 -d' '|cut -f1 -d'.'|awk '$1<107'|wc -l"
command = f"{jq_get_user_date_version} {path} | {total_users_below_sort} | tail -n1"
# exit(command)
total_users_below, err = run_command(command)
return (f"{instance},"
f"{total_chrome_versions.decode().strip()},"
f"{lowest_chrome_version.decode().strip()},"
f"{int(total_users.decode().strip())},"
f"{total_users_below.decode().strip()}")
def run_command(command):
proc = subprocess.Popen(
command,
shell=True,
executable="/bin/bash",
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
return proc.communicate()
def output_final(rows):
command = f"echo \"{rows}\" | sort -rn -k 3 "
result, err = run_command(command)
print("Instance,Total Chrome Vers,Lowest Chrome Ver,Uniq Users in JSON,Users Less than 107")
print(result.decode())
mypath = "./user-devices"
result = ""
for file in get_files(mypath):
file_path = f"{mypath}/{file}"
result = get_json_from_file(file_path) + str(result)
output_final(result)
import subprocess
import requests
import json
import os
def get_1p_password(domain):
# Vaults
vaults = [
"foo", # CouchDB Instances
"quuuuux", # Dev,Test CouchDB Instances
"smaaaaang", # Clones - Couchdb
]
entry_names = [
domain,
f"couchdb: {domain}",
f"couchdb:{domain}",
f"couchdb - {domain}"
]
for vault in vaults:
for entry in entry_names:
try:
pw_result = subprocess.run([
"op", "item", "get", entry,
"--vault", vault,
"--field", "password",
"--reveal"
], capture_output=True, text=True, timeout=10)
user_result = subprocess.run([
"op", "item", "get", entry,
"--vault", vault,
"--field", "username"
], capture_output=True, text=True, timeout=10)
password = pw_result.stdout.strip() if pw_result.returncode == 0 else None
username = user_result.stdout.strip() if user_result.returncode == 0 else None
if password:
if not username:
username = "medic"
return username, password
except Exception as e:
print(f"WARN: get_1p_password() error {e}")
continue
return None, None
def site_reachable(site):
try:
resp = requests.get(site, timeout=5)
if resp.status_code < 200 or resp.status_code >= 400:
return False, resp.status_code
except Exception as e:
print(f"FAIL: {site} not reachable ({e})")
return False, 000
return True, resp.status_code
def run_command(command):
proc = subprocess.Popen(
command,
shell=True,
executable="/bin/bash",
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
return proc.communicate()
def get_version(site):
try:
full_url = f"https://{site}/api/v1/monitoring"
command = f"curl -qs '{full_url}'"
version_json, err = run_command(command)
membership = json.loads(version_json)
except Exception as e:
print(f"FAIL: {site} not reachable ({e})")
return False
if membership['version']['app']:
return membership['version']['app']
else:
return False
def get_urls(file):
urls = []
with open(file, "r") as url_file:
urls_temp = url_file.readlines()
for site in urls_temp:
site = site.strip()
if site:
urls.append(site)
return urls
def file_exists(path):
try:
os.path.getsize(path)
return True
except Exception as e:
return False
def process_site(site):
device_file = f"./user-devices/{site}.user-devices.json"
if file_exists(device_file):
bytes = os.path.getsize(device_file)
if bytes < 50:
print(f"WARN: {site} cached {bytes}b (less than 50b)")
warn = "(WARNING less than 50 bytes)"
else:
print(f"SUCCESS: {site} cached {bytes}b")
return True
reachable, http_response = site_reachable(f"https://{site}")
if not reachable:
print(f"FAIL: {site} could not be reached. Response {http_response}")
return False
version = get_version(site)
if not version:
print(f"FAIL: {site} version not found")
return False
username, password = get_1p_password(site)
if not password:
print(f"FAIL: {site} on {version} could not get password from 1Password")
return False
auth = f"{username}:{password}"
full_url = f"https://{site}/api/v2/export/user-devices"
try:
command = f"curl -sq -u '{auth}' '{full_url}' > {device_file}"
subprocess.run(command, shell=True, executable="/bin/bash")
bytes = os.path.getsize(device_file)
except Exception as e:
print(f"FAIL: {site} on {version} couldn't reach API: ({e})")
return False
if bytes > 50:
print(f"SUCCESS: {site} on {version} wrote {bytes}b")
return True
else:
print(f"WARN: {site} on {version} wrote {bytes}b (less than 50b)")
return False
urls = get_urls("urls.txt")
print(f"Starting with {len(urls)} sites!")
for site in urls:
process_site(site)
print("Done!")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment