Created
November 18, 2025 15:53
-
-
Save n0kovo/c460349c2e8ab033e3f9ec0746dc5852 to your computer and use it in GitHub Desktop.
Investigate CloudFlare outage impact by checking the status of top sites from CrUX
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Investigate CloudFlare outage impact by checking the status of top sites from CrUX. | |
| """ | |
| import asyncio | |
| from pathlib import Path | |
| from typing import Dict, List | |
| from httpx import AsyncClient, TimeoutException, ConnectError, HTTPStatusError, AsyncHTTPTransport | |
| from tqdm.asyncio import tqdm | |
| from google.cloud import bigquery | |
| import ssl | |
| BYTES_TO_CHECK = 1024 | |
| class SiteList: | |
| def __init__(self, magnitude: int = 10000): | |
| self.urls: List[str] = [] | |
| cache_filename = f"crux_top_{magnitude}_sites.txt" | |
| cache_path = Path(cache_filename) | |
| if cache_path.exists(): | |
| print(f"Loading URLs from cache file: {cache_filename}") | |
| self.urls = list(set([line.strip() for line in cache_path.read_text() if line.strip()])) | |
| print(f"Successfully loaded {len(self.urls)} URLs from cache.") | |
| return | |
| print(f"Fetching top {magnitude} sites from CrUX BigQuery dataset...") | |
| try: | |
| client = bigquery.Client() | |
| query = f""" | |
| SELECT origin | |
| FROM `chrome-ux-report.all.202408` | |
| WHERE experimental.popularity.rank IS NOT NULL | |
| ORDER BY experimental.popularity.rank | |
| LIMIT {magnitude} | |
| """ | |
| df = client.query(query).to_dataframe() | |
| self.urls = list(set([origin for origin in df["origin"]])) | |
| try: | |
| cache_path.write_text('\n'.join(self.urls)) | |
| except IOError as e: | |
| print(f"Warning: Could not write to cache file {cache_filename}: {e}") | |
| except Exception as e: | |
| print(f"Error fetching data from BigQuery: {e}") | |
| if not self.urls: | |
| exit("Exiting: Cannot proceed without a list of URLs.") | |
| async def check_url(client: AsyncClient, url: str) -> Dict: | |
| uses_cf = False | |
| is_impacted = False | |
| challenge_phrase = b"<title>Just a moment...</title>" | |
| try: | |
| async with client.stream("GET", url, timeout=10, follow_redirects=True) as response: | |
| # Check server header. | |
| if response.headers.get("Server", "").lower() == "cloudflare": | |
| uses_cf = True | |
| else: | |
| # If it's not a Cloudflare site, we can stop here. | |
| return {"url": url, "is_cloudflare": False, "is_impacted": False} | |
| # If we're here, Cloudflare confirmed. Check if impacted. | |
| # Condition 1: A 5xx server error is a clear sign of impact. | |
| if 500 <= response.status_code < 600: | |
| is_impacted = True | |
| # Condition 2: If no 5xx error, check the body for the challenge page. | |
| # All sites using the Turnstile challenge are unusable. | |
| else: | |
| first_bytes = bytearray() | |
| async for chunk in response.aiter_bytes(): | |
| first_bytes.extend(chunk) | |
| if len(first_bytes) >= BYTES_TO_CHECK or challenge_phrase in first_bytes: | |
| break | |
| if challenge_phrase in first_bytes: | |
| is_impacted = True | |
| except (TimeoutException, ConnectError, HTTPStatusError, Exception): | |
| pass | |
| return {"url": url, "is_cloudflare": uses_cf, "is_impacted": is_impacted} | |
| async def sem_check_url(semaphore: asyncio.Semaphore, client: AsyncClient, url: str) -> Dict: | |
| """ | |
| Wrapper for check_url with a semaphore. | |
| """ | |
| async with semaphore: | |
| return await check_url(client, url) | |
| async def main(): | |
| SITES_TO_CHECK = 10000 | |
| MAX_CONCURRENCY = 100 | |
| site_list = SiteList( | |
| magnitude=SITES_TO_CHECK, | |
| ) | |
| total_sites = len(site_list.urls) | |
| if total_sites == 0: | |
| print("No sites to check. Exiting.") | |
| return | |
| semaphore = asyncio.Semaphore(MAX_CONCURRENCY) | |
| custom_ssl_context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH) | |
| custom_ssl_context.options |= 0x00040000 # OP flag SSL_OP_ALLOW_UNSAFE_LEGACY_RENEGOTIATION | |
| transport = AsyncHTTPTransport(verify=custom_ssl_context) | |
| async with AsyncClient(transport=transport) as client: | |
| tasks = [sem_check_url(semaphore, client, url) for url in site_list.urls] | |
| results = await tqdm.gather(*tasks, desc="Checking urls") | |
| # Aggregate results | |
| cloudflare_sites = [r["url"] for r in results if r["is_cloudflare"]] | |
| impacted_sites = [r["url"] for r in results if r["is_cloudflare"] and r["is_impacted"]] | |
| print(f"Total urls checked: {total_sites}") | |
| print(f"Cloudflare sites identified: {len(cloudflare_sites)}") | |
| print(f"Impacted Cloudflare sites: {len(impacted_sites)}") | |
| if len(cloudflare_sites) > 0: | |
| impact_percentage = (len(impacted_sites) / len(cloudflare_sites)) * 100 | |
| print(f"Impact rate: {impact_percentage:.2f}% of identified Cloudflare sites are impacted.") | |
| if impacted_sites: | |
| Path("impacted_sites.txt").write_text("\n".join(impacted_sites)) | |
| else: | |
| print("\nNo impacted Cloudflare urls found.") | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment