Created
July 21, 2025 08:03
-
-
Save svrnm/1ccec8dd126a0b804e421ee385f6ed58 to your computer and use it in GitHub Desktop.
OTel Demo Flag Scheduler
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python | |
| """ | |
| Chaos Scheduler Script for flagd-ui | |
| Continuously triggers random feature flags at specified intervals | |
| Usage: python chaos-scheduler.py <interval> [--dry-run] [--seed <number>] | |
| Example: python chaos-scheduler.py 15min --dry-run --seed 42 | |
| """ | |
| import sys | |
| import time | |
| import random | |
| import subprocess | |
| import argparse | |
| import re | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Tuple | |
| class ChaosScheduler: | |
| def __init__(self): | |
| self.dry_run = False | |
| self.interval_seconds = 0 | |
| self.seed = None | |
| self.toggle_script = "./toggle-flags.py" | |
| self.base_url = "http://localhost:8080/feature/api" # Default base URL | |
| # Available flags that can be turned "on" (non-off variants) | |
| self.available_flags = { | |
| "productCatalogFailure": ["on"], | |
| "recommendationCacheFailure": ["on"], | |
| "adManualGc": ["on"], | |
| "adHighCpu": ["on"], | |
| "adFailure": ["on"], | |
| "kafkaQueueProblems": ["on"], | |
| "cartFailure": ["on"], | |
| "paymentFailure": ["100%", "90%", "75%", "50%", "25%", "10%"], | |
| "paymentUnreachable": ["on"], | |
| "loadGeneratorFloodHomepage": ["on"], | |
| "imageSlowLoad": ["10sec", "5sec"] | |
| } | |
| def show_usage(self): | |
| """Display usage information""" | |
| print("Usage: python chaos-scheduler.py <interval> [--dry-run] [--seed <number>] [--base-url URL]") | |
| print("") | |
| print("Interval formats:") | |
| print(" 30sec, 1min, 15min, 1h, 2h, 1day") | |
| print(" Examples: 30sec, 5min, 1h, 2day") | |
| print("") | |
| print("Options:") | |
| print(" --dry-run Only print what would be done, don't make actual API calls") | |
| print(" --seed <number> Use specific seed for reproducible random patterns") | |
| print(" --base-url URL Base URL for flagd-ui API (default: http://localhost:8080/feature/api)") | |
| print("") | |
| print("Examples:") | |
| print(" python chaos-scheduler.py 15min") | |
| print(" python chaos-scheduler.py 1h --dry-run") | |
| print(" python chaos-scheduler.py 30sec --seed 12345") | |
| print(" python chaos-scheduler.py 5min --dry-run --seed 42") | |
| print(" python chaos-scheduler.py 15min --base-url http://remote-server:9090/feature/api") | |
| print("") | |
| print("Reproducibility:") | |
| print(" Using the same seed will produce identical chaos patterns across runs.") | |
| print(" The seed will be logged at startup for easy reproduction.") | |
| print("") | |
| print("Note: Requires toggle-flags.py to be in the same directory") | |
| def parse_interval(self, interval_str: str) -> int: | |
| """Parse time interval string to seconds""" | |
| match = re.match(r'(\d+)([a-zA-Z]+)', interval_str) | |
| if not match: | |
| return 0 | |
| number = int(match.group(1)) | |
| unit = match.group(2).lower() | |
| unit_multipliers = { | |
| 'sec': 1, 'second': 1, 'seconds': 1, | |
| 'min': 60, 'minute': 60, 'minutes': 60, | |
| 'h': 3600, 'hour': 3600, 'hours': 3600, | |
| 'day': 86400, 'days': 86400 | |
| } | |
| multiplier = unit_multipliers.get(unit) | |
| if multiplier is None: | |
| return 0 | |
| return number * multiplier | |
| def log_message(self, message: str): | |
| """Log message with timestamp""" | |
| timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
| print(f"[{timestamp}] {message}") | |
| def format_duration(self, seconds: int) -> str: | |
| """Format duration for display""" | |
| if seconds < 60: | |
| return f"{seconds}s" | |
| elif seconds < 3600: | |
| minutes = seconds // 60 | |
| remaining_seconds = seconds % 60 | |
| return f"{minutes}m {remaining_seconds}s" | |
| elif seconds < 86400: | |
| hours = seconds // 3600 | |
| remaining_minutes = (seconds % 3600) // 60 | |
| return f"{hours}h {remaining_minutes}m" | |
| else: | |
| days = seconds // 86400 | |
| remaining_hours = (seconds % 86400) // 3600 | |
| return f"{days}d {remaining_hours}h" | |
| def get_random_flag_variant(self) -> Tuple[str, str]: | |
| """Get random flag and non-off variant""" | |
| flag_name = random.choice(list(self.available_flags.keys())) | |
| variant = random.choice(self.available_flags[flag_name]) | |
| return flag_name, variant | |
| def set_flag(self, flag_name: str, variant: str) -> bool: | |
| """Set flag using toggle-flags.py""" | |
| # Build command with base_url if not default | |
| cmd = [sys.executable, self.toggle_script, flag_name, variant] | |
| if self.base_url != "http://localhost:8080/feature/api": | |
| cmd.extend(["--base-url", self.base_url]) | |
| if self.dry_run: | |
| cmd_str = " ".join(cmd) | |
| self.log_message(f"🧪 [DRY-RUN] Would run: {cmd_str}") | |
| return True | |
| try: | |
| # Use the existing toggle-flags.py script | |
| result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) | |
| if result.returncode == 0: | |
| return True | |
| else: | |
| self.log_message(f"❌ Failed to set {flag_name} to {variant} using {self.toggle_script}") | |
| if result.stderr: | |
| self.log_message(f"Error: {result.stderr.strip()}") | |
| return False | |
| except subprocess.TimeoutExpired: | |
| self.log_message(f"❌ Timeout setting {flag_name} to {variant}") | |
| return False | |
| except Exception as e: | |
| self.log_message(f"❌ Error setting {flag_name} to {variant}: {e}") | |
| return False | |
| def chaos_loop(self): | |
| """Main chaos loop""" | |
| interval_count = 0 | |
| self.log_message(f"🚀 Starting Chaos Scheduler with interval: {self.format_duration(self.interval_seconds)}") | |
| if self.dry_run: | |
| self.log_message("🧪 Running in DRY-RUN mode - no actual changes will be made") | |
| if self.seed is not None: | |
| self.log_message(f"🎲 Using seed: {self.seed} (for reproducible patterns)") | |
| else: | |
| current_seed = random.randrange(2**31) | |
| random.seed(current_seed) | |
| self.log_message(f"🎲 Using random seed: {current_seed} (use --seed {current_seed} to reproduce this run)") | |
| try: | |
| while True: | |
| interval_count += 1 | |
| interval_start_time = time.time() | |
| interval_end_time = interval_start_time + self.interval_seconds | |
| self.log_message(f"📅 Beginning interval #{interval_count} (duration: {self.format_duration(self.interval_seconds)})") | |
| # Calculate random trigger time within this interval (0 to interval_seconds) | |
| random_offset = random.randint(0, max(0, self.interval_seconds - 1)) | |
| trigger_time = interval_start_time + random_offset | |
| trigger_time_str = datetime.fromtimestamp(trigger_time).strftime('%H:%M:%S') | |
| self.log_message(f"⏰ Next trigger scheduled in {self.format_duration(random_offset)} (at {trigger_time_str})") | |
| # Wait until trigger time | |
| current_time = time.time() | |
| if current_time < trigger_time: | |
| time.sleep(trigger_time - current_time) | |
| # Get random flag and variant | |
| flag_name, variant = self.get_random_flag_variant() | |
| # Calculate remaining time in interval for maximum duration | |
| current_time = time.time() | |
| remaining_time = int(interval_end_time - current_time) | |
| if remaining_time <= 0: | |
| self.log_message("⚠️ Interval ended before flag could be triggered, moving to next interval") | |
| continue | |
| # Random duration flag stays on (1 second to remaining time) | |
| flag_duration = random.randint(1, max(1, remaining_time)) | |
| self.log_message(f"🎯 TRIGGER: Setting {flag_name} to {variant} for {self.format_duration(flag_duration)}") | |
| # Set flag to active state | |
| if self.set_flag(flag_name, variant): | |
| if not self.dry_run: | |
| self.log_message(f"✅ Successfully activated {flag_name} = {variant}") | |
| # Wait for the flag duration | |
| time.sleep(flag_duration) | |
| # Set flag back to off | |
| self.log_message(f"🔄 REVERT: Setting {flag_name} back to off") | |
| if self.set_flag(flag_name, "off"): | |
| if not self.dry_run: | |
| self.log_message(f"✅ Successfully reverted {flag_name} to off") | |
| # Wait until end of interval | |
| current_time = time.time() | |
| if current_time < interval_end_time: | |
| wait_time = int(interval_end_time - current_time) | |
| if wait_time > 0: | |
| self.log_message(f"⏸️ Waiting {self.format_duration(wait_time)} until end of interval") | |
| time.sleep(wait_time) | |
| self.log_message(f"🏁 End of interval #{interval_count}") | |
| print("----------------------------------------") | |
| except KeyboardInterrupt: | |
| self.log_message("🛑 Received shutdown signal, exiting gracefully...") | |
| sys.exit(0) | |
| def validate_toggle_script(self): | |
| """Check if toggle-flags.py exists and is executable""" | |
| script_path = Path(self.toggle_script) | |
| if not script_path.exists(): | |
| print(f"Error: {self.toggle_script} not found in current directory") | |
| print("Please ensure toggle-flags.py is in the same directory as this script") | |
| sys.exit(1) | |
| # Test if the script is runnable | |
| try: | |
| cmd = [sys.executable, self.toggle_script, "--help"] | |
| if self.base_url != "http://localhost:8080/feature/api": | |
| cmd.extend(["--base-url", self.base_url]) | |
| result = subprocess.run(cmd, capture_output=True, text=True, timeout=5) | |
| if result.returncode != 0: | |
| print(f"Error: {self.toggle_script} is not working properly") | |
| print("Please ensure toggle-flags.py is functional") | |
| sys.exit(1) | |
| except Exception as e: | |
| print(f"Error: Cannot execute {self.toggle_script}: {e}") | |
| sys.exit(1) | |
| def parse_args(self): | |
| """Parse command line arguments""" | |
| if len(sys.argv) == 1 or sys.argv[1] in ["-h", "--help"]: | |
| self.show_usage() | |
| sys.exit(0) | |
| parser = argparse.ArgumentParser(description="Chaos Scheduler for flagd-ui", add_help=False) | |
| parser.add_argument("interval", help="Time interval (e.g., 15min, 1h, 30sec)") | |
| parser.add_argument("--dry-run", "-n", action="store_true", help="Only print what would be done") | |
| parser.add_argument("--seed", type=int, help="Seed for reproducible random patterns") | |
| parser.add_argument("--base-url", default="http://localhost:8080/feature/api", | |
| help="Base URL for flagd-ui API (default: http://localhost:8080/feature/api)") | |
| try: | |
| args = parser.parse_args() | |
| except SystemExit: | |
| self.show_usage() | |
| sys.exit(1) | |
| # Parse interval | |
| self.interval_seconds = self.parse_interval(args.interval) | |
| if self.interval_seconds == 0: | |
| print(f"Error: Invalid interval format '{args.interval}'") | |
| self.show_usage() | |
| sys.exit(1) | |
| # Validate minimum interval (10 seconds) | |
| if self.interval_seconds < 10: | |
| print("Error: Interval must be at least 10 seconds") | |
| sys.exit(1) | |
| # Set other options | |
| self.dry_run = args.dry_run | |
| self.base_url = args.base_url | |
| if args.seed is not None: | |
| self.seed = args.seed | |
| random.seed(self.seed) | |
| def run(self): | |
| """Main entry point""" | |
| self.parse_args() | |
| self.validate_toggle_script() | |
| self.chaos_loop() | |
| def main(): | |
| """Main function""" | |
| scheduler = ChaosScheduler() | |
| scheduler.run() | |
| if __name__ == "__main__": | |
| main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python | |
| """ | |
| Feature Flag Toggle Script for flagd-ui | |
| Usage: python toggle-flags.py <flag_name> <variant> | |
| Example: python toggle-flags.py adFailure on | |
| """ | |
| import sys | |
| import json | |
| import requests | |
| import argparse | |
| from typing import Dict, Any, Optional | |
| # Default Base URL for the flagd-ui API | |
| DEFAULT_BASE_URL = "http://localhost:8080/feature/api" | |
| BASE_URL = DEFAULT_BASE_URL # Will be updated by argument parsing | |
| def show_usage(): | |
| """Display usage information""" | |
| print("Usage: python toggle-flags.py <flag_name> <variant> [--base-url URL]") | |
| print("") | |
| print("Available flags and variants:") | |
| print(" productCatalogFailure: on, off") | |
| print(" recommendationCacheFailure: on, off") | |
| print(" adManualGc: on, off") | |
| print(" adHighCpu: on, off") | |
| print(" adFailure: on, off") | |
| print(" kafkaQueueProblems: on, off") | |
| print(" cartFailure: on, off") | |
| print(" paymentFailure: 100%, 90%, 75%, 50%, 25%, 10%, off") | |
| print(" paymentUnreachable: on, off") | |
| print(" loadGeneratorFloodHomepage: on, off") | |
| print(" imageSlowLoad: 10sec, 5sec, off") | |
| print("") | |
| print("Options:") | |
| print(f" --base-url URL Base URL for flagd-ui API (default: {DEFAULT_BASE_URL})") | |
| print("") | |
| print("Examples:") | |
| print(" python toggle-flags.py adFailure on") | |
| print(" python toggle-flags.py paymentFailure 50%") | |
| print(" python toggle-flags.py imageSlowLoad 5sec") | |
| print(" python toggle-flags.py list # Show current status") | |
| print(" python toggle-flags.py adFailure on --base-url http://custom-host:9090/feature/api") | |
| def get_variant_value(flag_name: str, variant: str) -> Optional[Any]: | |
| """Get the variant value based on flag and variant name""" | |
| boolean_flags = [ | |
| "productCatalogFailure", "recommendationCacheFailure", "adManualGc", | |
| "adHighCpu", "adFailure", "cartFailure", "paymentUnreachable" | |
| ] | |
| numeric_flags = ["kafkaQueueProblems", "loadGeneratorFloodHomepage"] | |
| if flag_name in boolean_flags: | |
| if variant == "on": | |
| return True | |
| elif variant == "off": | |
| return False | |
| else: | |
| return None | |
| elif flag_name in numeric_flags: | |
| if variant == "on": | |
| return 100 | |
| elif variant == "off": | |
| return 0 | |
| else: | |
| return None | |
| elif flag_name == "paymentFailure": | |
| variant_map = { | |
| "100%": 1, | |
| "90%": 0.95, | |
| "75%": 0.75, | |
| "50%": 0.5, | |
| "25%": 0.25, | |
| "10%": 0.1, | |
| "off": 0 | |
| } | |
| return variant_map.get(variant) | |
| elif flag_name == "imageSlowLoad": | |
| variant_map = { | |
| "10sec": 10000, | |
| "5sec": 5000, | |
| "off": 0 | |
| } | |
| return variant_map.get(variant) | |
| return None | |
| def get_all_variants(flag_name: str) -> Dict[str, Any]: | |
| """Get all flag variants for JSON construction""" | |
| boolean_flags = [ | |
| "productCatalogFailure", "recommendationCacheFailure", "adManualGc", | |
| "adHighCpu", "adFailure", "cartFailure", "paymentUnreachable" | |
| ] | |
| numeric_flags = ["kafkaQueueProblems", "loadGeneratorFloodHomepage"] | |
| if flag_name in boolean_flags: | |
| return {"on": True, "off": False} | |
| elif flag_name in numeric_flags: | |
| return {"on": 100, "off": 0} | |
| elif flag_name == "paymentFailure": | |
| return { | |
| "100%": 1, | |
| "90%": 0.95, | |
| "75%": 0.75, | |
| "50%": 0.5, | |
| "25%": 0.25, | |
| "10%": 0.1, | |
| "off": 0 | |
| } | |
| elif flag_name == "imageSlowLoad": | |
| return {"10sec": 10000, "5sec": 5000, "off": 0} | |
| return {} | |
| def get_flag_description(flag_name: str) -> str: | |
| """Get flag description""" | |
| descriptions = { | |
| "productCatalogFailure": "Fail product catalog service on a specific product", | |
| "recommendationCacheFailure": "Fail recommendation service cache", | |
| "adManualGc": "Triggers full manual garbage collections in the ad service", | |
| "adHighCpu": "Triggers high cpu load in the ad service", | |
| "adFailure": "Fail ad service", | |
| "kafkaQueueProblems": "Overloads Kafka queue while simultaneously introducing a consumer side delay leading to a lag spike", | |
| "cartFailure": "Fail cart service", | |
| "paymentFailure": "Fail payment service charge requests n%", | |
| "paymentUnreachable": "Payment service is unavailable", | |
| "loadGeneratorFloodHomepage": "Flood the frontend with a large amount of requests.", | |
| "imageSlowLoad": "slow loading images in the frontend" | |
| } | |
| return descriptions.get(flag_name, "") | |
| def construct_json(target_flag: str, target_variant: str) -> Dict[str, Any]: | |
| """Construct the complete JSON payload""" | |
| all_flags = [ | |
| "productCatalogFailure", "recommendationCacheFailure", "adManualGc", | |
| "adHighCpu", "adFailure", "kafkaQueueProblems", "cartFailure", | |
| "paymentFailure", "paymentUnreachable", "loadGeneratorFloodHomepage", | |
| "imageSlowLoad" | |
| ] | |
| flags = {} | |
| for flag_name in all_flags: | |
| # Set default variant based on target flag | |
| if flag_name == target_flag: | |
| default_variant = target_variant | |
| else: | |
| # Set sensible defaults for other flags | |
| if flag_name == "productCatalogFailure": | |
| default_variant = "on" | |
| else: | |
| default_variant = "off" | |
| flags[flag_name] = { | |
| "description": get_flag_description(flag_name), | |
| "state": "ENABLED", | |
| "variants": get_all_variants(flag_name), | |
| "defaultVariant": default_variant | |
| } | |
| return { | |
| "data": { | |
| "$schema": "https://flagd.dev/schema/v0/flags.json", | |
| "flags": flags | |
| } | |
| } | |
| def list_current_status(): | |
| """List current flag status""" | |
| try: | |
| print("Fetching current flag status...") | |
| response = requests.get(f"{BASE_URL}/read-file", | |
| headers={'Content-Type': 'application/json'}, | |
| timeout=10) | |
| if response.status_code == 200: | |
| data = response.json() | |
| if 'flags' in data: | |
| for flag_name, flag_data in data['flags'].items(): | |
| default_variant = flag_data.get('defaultVariant', 'unknown') | |
| description = flag_data.get('description', '') | |
| print(f"{flag_name}: {default_variant} ({description})") | |
| else: | |
| print("No flags found in response") | |
| else: | |
| print(f"Error: HTTP {response.status_code}") | |
| print(f"Ensure flagd-ui is running and accessible at {BASE_URL}") | |
| except requests.exceptions.RequestException as e: | |
| print(f"Error connecting to flagd-ui: {e}") | |
| print(f"Ensure flagd-ui is running and accessible at {BASE_URL}") | |
| except Exception as e: | |
| print(f"Error: {e}") | |
| def set_flag(flag_name: str, variant: str) -> bool: | |
| """Set a flag to a specific variant""" | |
| # Validate flag name and variant | |
| variant_value = get_variant_value(flag_name, variant) | |
| if variant_value is None: | |
| print(f"Error: Invalid flag name '{flag_name}' or variant '{variant}'") | |
| show_usage() | |
| return False | |
| print(f"Setting {flag_name} to {variant}...") | |
| try: | |
| # Construct JSON payload | |
| json_payload = construct_json(flag_name, variant) | |
| # Make the API call | |
| response = requests.post(f"{BASE_URL}/write-to-file", | |
| json=json_payload, | |
| headers={'Content-Type': 'application/json'}, | |
| timeout=10) | |
| if response.status_code in [200, 201]: | |
| print(f"✅ Successfully set {flag_name} to {variant}") | |
| return True | |
| else: | |
| print(f"❌ Failed to set flag. HTTP Code: {response.status_code}") | |
| print(f"Response: {response.text}") | |
| return False | |
| except requests.exceptions.RequestException as e: | |
| print(f"❌ Failed to connect to flagd-ui: {e}") | |
| print(f"Ensure flagd-ui is running and accessible at {BASE_URL}") | |
| return False | |
| except Exception as e: | |
| print(f"❌ Error: {e}") | |
| return False | |
| def main(): | |
| """Main script logic""" | |
| global BASE_URL | |
| parser = argparse.ArgumentParser( | |
| description="Feature Flag Toggle Script for flagd-ui", | |
| add_help=False # We'll handle help ourselves | |
| ) | |
| parser.add_argument("flag_name", nargs="?", help="Flag name to toggle") | |
| parser.add_argument("variant", nargs="?", help="Variant to set (on/off/percentage/etc)") | |
| parser.add_argument("--base-url", default=DEFAULT_BASE_URL, | |
| help=f"Base URL for flagd-ui API (default: {DEFAULT_BASE_URL})") | |
| parser.add_argument("-h", "--help", action="store_true", help="Show help message") | |
| try: | |
| args = parser.parse_args() | |
| except SystemExit: | |
| show_usage() | |
| sys.exit(1) | |
| # Update global BASE_URL from argument | |
| BASE_URL = args.base_url | |
| # Handle help | |
| if args.help or not args.flag_name: | |
| show_usage() | |
| sys.exit(0) | |
| # Handle list command | |
| if args.flag_name == "list": | |
| list_current_status() | |
| sys.exit(0) | |
| # Validate we have both flag_name and variant | |
| if not args.variant: | |
| print("Error: Both flag_name and variant are required") | |
| show_usage() | |
| sys.exit(1) | |
| success = set_flag(args.flag_name, args.variant) | |
| sys.exit(0 if success else 1) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment