Skip to content

Instantly share code, notes, and snippets.

@zzstoatzz
Last active August 14, 2025 15:29
Show Gist options
  • Select an option

  • Save zzstoatzz/c8a4ad709876c44cefd40e65ac983485 to your computer and use it in GitHub Desktop.

Select an option

Save zzstoatzz/c8a4ad709876c44cefd40e65ac983485 to your computer and use it in GitHub Desktop.
#!/usr/bin/env -S uv run --quiet --script
# /// script
# dependencies = ["prefect"]
# ///
"""
Asset management script for Prefect Cloud.
This script provides CRUD operations for managing assets in Prefect Cloud workspaces.
Assets are created automatically by Prefect when events are emitted, but this script
allows you to list and delete them programmatically.
## Prerequisites
- uv must be installed
- You must be authenticated to Prefect Cloud
- You must have a workspace selected
## Setup
1. Install Prefect and authenticate:
```bash
uvx prefect cloud login
```
2. Select your workspace:
```bash
uvx prefect cloud workspace set --workspace your-workspace-name
```
## Usage
### List assets
```bash
python asset_manager.py list
python asset_manager.py list --limit 50 # Show only first 50 assets
```
### Delete old assets
```bash
# Dry run - see what would be deleted
python asset_manager.py delete-old --days 7 --dry-run
# Actually delete assets older than 7 days (with confirmation)
python asset_manager.py delete-old --days 7
# Delete without confirmation prompt
python asset_manager.py delete-old --days 7 --force
# Delete ALL assets (older than 0 days)
python asset_manager.py delete-old --days 0 --force
```
### Test connection
```bash
python asset_manager.py test
```
## How it works
- Assets are tracked by their "key" which is typically a URI (e.g., `slack://workspace/channel/123`)
- The script uses the `/assets/latest-dependencies` endpoint to determine when assets were last updated
- Assets without any materialization events are considered "old" and will be deleted
- All operations use the Prefect client which handles authentication automatically
## Examples
1. Clean up assets older than 30 days:
```bash
python asset_manager.py delete-old --days 30
```
2. Delete all assets in a workspace (use with caution!):
```bash
python asset_manager.py delete-old --days 0 --force
```
3. Check how many assets you have:
```bash
python asset_manager.py list | grep "Found"
```
"""
import argparse
import asyncio
from datetime import datetime, timedelta, timezone
from typing import Any
from urllib.parse import quote
from prefect import get_client
# Core asset functions
async def list_assets(limit: int = 100, offset: int = 0) -> list[dict[str, Any]]:
"""List assets in the workspace."""
async with get_client() as client:
response = await client._client.get(
"/assets/", params={"limit": limit, "offset": offset}
)
response.raise_for_status()
data = response.json()
# The response appears to be a list directly, not wrapped in an object
if isinstance(data, list):
# If we get a list back, check if it's the full list or just a page
# For now, assume it's always the full list based on our testing
if offset == 0:
return data
else:
return [] # No pagination support if returning raw list
return data.get("results", [])
async def get_asset(asset_key: str) -> dict[str, Any]:
"""Get a specific asset by key."""
async with get_client() as client:
encoded_key = quote(asset_key, safe="")
response = await client._client.get(f"/assets/key/{encoded_key}")
response.raise_for_status()
return response.json()
async def delete_asset(asset_key: str) -> None:
"""Delete an asset by key."""
async with get_client() as client:
encoded_key = quote(asset_key, safe="")
response = await client._client.delete(f"/assets/key/{encoded_key}")
response.raise_for_status()
async def _get_latest_dependencies() -> list[dict[str, Any]]:
"""Get latest asset dependencies (internal use)."""
async with get_client() as client:
response = await client._client.get("/assets/latest-dependencies")
response.raise_for_status()
return response.json()
# Main functions for different operations
async def delete_old_assets(days: int, dry_run: bool = False, force: bool = False):
"""Delete assets older than specified days."""
cutoff_time = datetime.now(timezone.utc) - timedelta(days=days)
print(
f"{'DRY RUN: ' if dry_run else ''}Deleting assets older than {cutoff_time.isoformat()}"
)
# Get all assets
all_assets = []
offset = 0
limit = 100
page = 0
print("Fetching all assets...")
while True:
page += 1
print(f" Fetching page {page} (offset {offset})...")
assets = await list_assets(limit=limit, offset=offset)
if not assets:
break
all_assets.extend(assets)
if len(assets) < limit:
break
offset += limit
print(f"Found {len(all_assets)} total assets")
# Get latest dependencies to find last materialization times
print("Fetching asset dependencies to determine last update times...")
dependencies = await _get_latest_dependencies()
# Create a map of asset keys to their latest materialization time
asset_last_updated = {}
for dep in dependencies:
# Track both upstream and downstream assets
for key in [dep.get("upstream"), dep.get("downstream")]:
if key:
occurred = dep.get("occurred")
if occurred:
occurred_time = datetime.fromisoformat(
occurred.replace("Z", "+00:00")
)
# Keep the most recent time for each asset
if (
key not in asset_last_updated
or occurred_time > asset_last_updated[key]
):
asset_last_updated[key] = occurred_time
# Find assets to delete
assets_to_delete = []
for asset in all_assets:
asset_key = asset["key"]
last_updated = asset_last_updated.get(asset_key)
if last_updated:
if last_updated < cutoff_time:
assets_to_delete.append(
{"key": asset_key, "last_updated": last_updated}
)
else:
# No materialization events found - consider it old
assets_to_delete.append({"key": asset_key, "last_updated": None})
print(f"\nFound {len(assets_to_delete)} assets to delete")
if not assets_to_delete:
print("No assets to delete")
return
# Sort by last_updated for display
assets_to_delete.sort(
key=lambda x: x["last_updated"] or datetime.min.replace(tzinfo=timezone.utc)
)
# Display assets to be deleted
print("\nAssets to delete:")
for asset in assets_to_delete[:10]: # Show first 10
if asset["last_updated"]:
print(
f" - {asset['key']} (last updated: {asset['last_updated'].isoformat()})"
)
else:
print(f" - {asset['key']} (no materialization events found)")
if len(assets_to_delete) > 10:
print(f" ... and {len(assets_to_delete) - 10} more")
if dry_run:
print("\nDRY RUN: No assets were deleted")
return
# Confirm deletion
if not force:
print(f"\n⚠️ WARNING: This will delete {len(assets_to_delete)} assets!")
user_input = input("Type 'DELETE' to confirm: ")
if user_input != "DELETE":
print("Operation cancelled")
return
else:
print(
f"\n⚠️ WARNING: Force flag set - deleting {len(assets_to_delete)} assets without confirmation!"
)
# Delete assets
deleted_count = 0
failed_count = 0
print("\nDeleting assets...")
for asset in assets_to_delete:
try:
await delete_asset(asset["key"])
deleted_count += 1
if deleted_count % 10 == 0:
print(f" Deleted {deleted_count}/{len(assets_to_delete)} assets...")
except Exception as e:
print(f" ✗ Failed to delete {asset['key']}: {e}")
failed_count += 1
print(f"\n✓ Successfully deleted {deleted_count} assets")
if failed_count > 0:
print(f"✗ Failed to delete {failed_count} assets")
async def list_assets_main(limit: int = 100):
"""List assets in the workspace."""
print("Listing assets...")
assets = await list_assets(limit=limit)
print(f"Found {len(assets)} assets:")
for asset in assets:
print(f" - {asset['key']}")
async def test_connection():
"""Test connection and show basic info."""
async with get_client() as client:
print(f"Connected to: {client.api_url}")
# List a few assets to confirm access
assets = await list_assets(limit=5)
print(f"\nFound {len(assets)} assets (showing up to 5)")
for asset in assets:
print(f" - {asset['key']}")
def main():
parser = argparse.ArgumentParser(
description="Manage Prefect Cloud assets",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Delete assets older than 30 days (dry run)
%(prog)s delete-old --days 30 --dry-run
# Delete assets older than 7 days
%(prog)s delete-old --days 7
# List assets
%(prog)s list --limit 50
# Test connection
%(prog)s test
""",
)
subparsers = parser.add_subparsers(dest="command", help="Command to run")
# Delete old assets command
delete_parser = subparsers.add_parser(
"delete-old", help="Delete assets older than specified days"
)
delete_parser.add_argument(
"--days",
type=int,
required=True,
help="Delete assets older than this many days",
)
delete_parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would be deleted without actually deleting",
)
delete_parser.add_argument(
"--force",
action="store_true",
help="Skip confirmation prompt (use with caution!)",
)
# List assets command
list_parser = subparsers.add_parser("list", help="List assets in the workspace")
list_parser.add_argument(
"--limit",
type=int,
default=100,
help="Maximum number of assets to list (default: 100)",
)
# Test connection command
subparsers.add_parser("test", help="Test connection to Prefect Cloud")
args = parser.parse_args()
if not args.command:
parser.print_help()
return
# Run the appropriate command
if args.command == "delete-old":
asyncio.run(delete_old_assets(args.days, args.dry_run, args.force))
elif args.command == "list":
asyncio.run(list_assets_main(args.limit))
elif args.command == "test":
asyncio.run(test_connection())
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment