Skip to content

Instantly share code, notes, and snippets.

@zzstoatzz
zzstoatzz / tap_consumer.py
Last active December 13, 2025 08:36
consume ATProto lexicon events from tap
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.12"
# dependencies = ["websockets", "rich"]
# ///
"""consume ATProto lexicon events from tap with formatted output.
automatically starts tap if not running.
usage:
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "prefect>=3.0",
# "prefect-dbt>=0.7.9",
# "dbt-duckdb>=1.9",
# ]
# ///
"""
dbt + Prefect: Resume-from-Failure Demo
"""proof of concept: image moderation with pydantic-ai.
tests pydantic-ai's ability to analyze images and detect policy violations
with different policy strictness levels.
usage:
uv run sandbox/test_image_moderation.py
"""
import asyncio
@zzstoatzz
zzstoatzz / sync_status.py
Created September 8, 2025 05:31
syncs my global status to all 3rd party status (slack, github)
# /// script
# requires-python = ">=3.12"
# dependencies = ["aiohttp", "pydantic-settings"]
# ///
"""
Status Syndication Script
Syncs status from status.zzstoatzz.io to various third-party services
"""
@zzstoatzz
zzstoatzz / transfer_animation.py
Created August 20, 2025 01:29
must be run from github.com/prefecthq/prefect (requires editable install of prefect)
#!/usr/bin/env -S uv run -q --with-editable .
# /// script
# dependencies = ["matplotlib", "networkx", "rich"]
# ///
"""Animation showing the Prefect transfer process as a DAG visualization."""
import argparse
import asyncio
import os
import sys
did:plc:o53crari67ge7bvbv273lxln
#!/usr/bin/env -S uv run --quiet --script
# /// script
# dependencies = ["prefect"]
# ///
"""
Asset management script for Prefect Cloud.
This script provides CRUD operations for managing assets in Prefect Cloud workspaces.
Assets are created automatically by Prefect when events are emitted, but this script
@zzstoatzz
zzstoatzz / output
Created June 9, 2025 16:42
» uvx --with django pretty-mod tree django --depth 5 | pbcopy
📦 django
├── ⚡ functions: setup
├── 📌 constants: VERSION
├── 📦 apps
│ ├── 📜 __all__: AppConfig, apps
│ ├── 📦 config
│ │ ├── 🔷 classes: AppConfig
│ │ └── 📌 constants: APPS_MODULE_NAME, MODELS_MODULE_NAME
│ └── 📦 registry
│ └── 🔷 classes: Apps
import asyncio
from prefect import flow, task
@task
async def sleepy_task() -> None:
await asyncio.sleep(15)