Skip to content

Instantly share code, notes, and snippets.

@n0kovo
Created November 18, 2025 15:53
Show Gist options
  • Select an option

  • Save n0kovo/c460349c2e8ab033e3f9ec0746dc5852 to your computer and use it in GitHub Desktop.

Select an option

Save n0kovo/c460349c2e8ab033e3f9ec0746dc5852 to your computer and use it in GitHub Desktop.
Investigate CloudFlare outage impact by checking the status of top sites from CrUX
"""
Investigate CloudFlare outage impact by checking the status of top sites from CrUX.
"""
import asyncio
from pathlib import Path
from typing import Dict, List
from httpx import AsyncClient, TimeoutException, ConnectError, HTTPStatusError, AsyncHTTPTransport
from tqdm.asyncio import tqdm
from google.cloud import bigquery
import ssl
BYTES_TO_CHECK = 1024
class SiteList:
def __init__(self, magnitude: int = 10000):
self.urls: List[str] = []
cache_filename = f"crux_top_{magnitude}_sites.txt"
cache_path = Path(cache_filename)
if cache_path.exists():
print(f"Loading URLs from cache file: {cache_filename}")
self.urls = list(set([line.strip() for line in cache_path.read_text() if line.strip()]))
print(f"Successfully loaded {len(self.urls)} URLs from cache.")
return
print(f"Fetching top {magnitude} sites from CrUX BigQuery dataset...")
try:
client = bigquery.Client()
query = f"""
SELECT origin
FROM `chrome-ux-report.all.202408`
WHERE experimental.popularity.rank IS NOT NULL
ORDER BY experimental.popularity.rank
LIMIT {magnitude}
"""
df = client.query(query).to_dataframe()
self.urls = list(set([origin for origin in df["origin"]]))
try:
cache_path.write_text('\n'.join(self.urls))
except IOError as e:
print(f"Warning: Could not write to cache file {cache_filename}: {e}")
except Exception as e:
print(f"Error fetching data from BigQuery: {e}")
if not self.urls:
exit("Exiting: Cannot proceed without a list of URLs.")
async def check_url(client: AsyncClient, url: str) -> Dict:
uses_cf = False
is_impacted = False
challenge_phrase = b"<title>Just a moment...</title>"
try:
async with client.stream("GET", url, timeout=10, follow_redirects=True) as response:
# Check server header.
if response.headers.get("Server", "").lower() == "cloudflare":
uses_cf = True
else:
# If it's not a Cloudflare site, we can stop here.
return {"url": url, "is_cloudflare": False, "is_impacted": False}
# If we're here, Cloudflare confirmed. Check if impacted.
# Condition 1: A 5xx server error is a clear sign of impact.
if 500 <= response.status_code < 600:
is_impacted = True
# Condition 2: If no 5xx error, check the body for the challenge page.
# All sites using the Turnstile challenge are unusable.
else:
first_bytes = bytearray()
async for chunk in response.aiter_bytes():
first_bytes.extend(chunk)
if len(first_bytes) >= BYTES_TO_CHECK or challenge_phrase in first_bytes:
break
if challenge_phrase in first_bytes:
is_impacted = True
except (TimeoutException, ConnectError, HTTPStatusError, Exception):
pass
return {"url": url, "is_cloudflare": uses_cf, "is_impacted": is_impacted}
async def sem_check_url(semaphore: asyncio.Semaphore, client: AsyncClient, url: str) -> Dict:
"""
Wrapper for check_url with a semaphore.
"""
async with semaphore:
return await check_url(client, url)
async def main():
SITES_TO_CHECK = 10000
MAX_CONCURRENCY = 100
site_list = SiteList(
magnitude=SITES_TO_CHECK,
)
total_sites = len(site_list.urls)
if total_sites == 0:
print("No sites to check. Exiting.")
return
semaphore = asyncio.Semaphore(MAX_CONCURRENCY)
custom_ssl_context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
custom_ssl_context.options |= 0x00040000 # OP flag SSL_OP_ALLOW_UNSAFE_LEGACY_RENEGOTIATION
transport = AsyncHTTPTransport(verify=custom_ssl_context)
async with AsyncClient(transport=transport) as client:
tasks = [sem_check_url(semaphore, client, url) for url in site_list.urls]
results = await tqdm.gather(*tasks, desc="Checking urls")
# Aggregate results
cloudflare_sites = [r["url"] for r in results if r["is_cloudflare"]]
impacted_sites = [r["url"] for r in results if r["is_cloudflare"] and r["is_impacted"]]
print(f"Total urls checked: {total_sites}")
print(f"Cloudflare sites identified: {len(cloudflare_sites)}")
print(f"Impacted Cloudflare sites: {len(impacted_sites)}")
if len(cloudflare_sites) > 0:
impact_percentage = (len(impacted_sites) / len(cloudflare_sites)) * 100
print(f"Impact rate: {impact_percentage:.2f}% of identified Cloudflare sites are impacted.")
if impacted_sites:
Path("impacted_sites.txt").write_text("\n".join(impacted_sites))
else:
print("\nNo impacted Cloudflare urls found.")
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment