Last active
December 3, 2025 00:46
-
-
Save zzstoatzz/138c82fcc6e7b7e411254f3ef8bdcbd7 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # /// script | |
| # requires-python = ">=3.10" | |
| # dependencies = [ | |
| # "prefect>=3.0", | |
| # "prefect-dbt>=0.7.9", | |
| # "dbt-duckdb>=1.9", | |
| # ] | |
| # /// | |
| """ | |
| dbt + Prefect: Resume-from-Failure Demo | |
| Q: If 4 tests run and 1 fails, do we retry all tests? | |
| A: NO - `dbt retry` only re-runs the failed test(s). | |
| Run: | |
| uv run https://gist.githubusercontent.com/.../dbt_retry_demo.py | |
| """ | |
| import tempfile | |
| from pathlib import Path | |
| from prefect import flow | |
| from prefect_dbt import PrefectDbtRunner, PrefectDbtSettings | |
| def create_project(project_dir: Path): | |
| """Create a minimal dbt project with 1 model and 4 tests.""" | |
| project_dir.mkdir(parents=True, exist_ok=True) | |
| (project_dir / "dbt_project.yml").write_text( | |
| "name: demo\nversion: '1.0'\nprofile: demo\n" | |
| "model-paths: ['models']\ntest-paths: ['tests']" | |
| ) | |
| profiles = project_dir / "profiles" | |
| profiles.mkdir(exist_ok=True) | |
| (profiles / "profiles.yml").write_text( | |
| f"demo:\n target: dev\n outputs:\n dev:\n" | |
| f" type: duckdb\n path: {project_dir}/demo.duckdb" | |
| ) | |
| models = project_dir / "models" | |
| models.mkdir(exist_ok=True) | |
| (models / "users.sql").write_text( | |
| "select 1 as id, 'alice' as name union all select 2, 'bob'" | |
| ) | |
| (models / "schema.yml").write_text( | |
| "version: 2\nmodels:\n - name: users\n columns:\n" | |
| " - name: id\n tests: [unique, not_null]\n" | |
| " - name: name\n tests: [not_null]" | |
| ) | |
| tests = project_dir / "tests" | |
| tests.mkdir(exist_ok=True) | |
| # This test fails: bad assertion (will fix by correcting the test) | |
| (tests / "bad_test.sql").write_text( | |
| "-- This test incorrectly expects 10 users\n" | |
| "select 1 where (select count(*) from {{ ref('users') }}) < 10" | |
| ) | |
| @flow(log_prints=True) | |
| def main(): | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| project_dir = Path(tmpdir) / "demo" | |
| create_project(project_dir) | |
| settings = PrefectDbtSettings( | |
| project_dir=project_dir, | |
| profiles_dir=project_dir / "profiles", | |
| ) | |
| runner = PrefectDbtRunner(settings=settings) | |
| print("=" * 60) | |
| print("STEP 1: dbt build (1 model + 4 tests, 1 test will fail)") | |
| print("=" * 60) | |
| try: | |
| runner.invoke(["build"]) | |
| except ValueError: | |
| print("\n>>> 1 test failed (bad_test) - expected!") | |
| print("\n" + "=" * 60) | |
| print("STEP 2: Fix the bad test") | |
| print("=" * 60) | |
| tests = project_dir / "tests" | |
| (tests / "bad_test.sql").write_text( | |
| "-- Fixed: now correctly expects at least 1 user\n" | |
| "select 1 where (select count(*) from {{ ref('users') }}) < 1" | |
| ) | |
| print("bad_test.sql fixed!") | |
| print("\n" + "=" * 60) | |
| print("STEP 3: dbt retry") | |
| print("=" * 60) | |
| print("Watch the output: only 1 of 4 tests re-runs!\n") | |
| runner.invoke(["retry"]) | |
| print("\n" + "=" * 60) | |
| print("SUCCESS!") | |
| print("=" * 60) | |
| print(""" | |
| Q: If 4 tests run and 1 fails, do we retry all 4? | |
| A: NO - dbt retry only re-runs failed nodes. | |
| - dbt writes results to target/run_results.json | |
| - `dbt retry` reads that file to find failures | |
| - Only failed nodes re-run (see "1 of 1" above) | |
| This is dbt's native state tracking - works great with prefect-dbt! | |
| """) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment