Created
August 20, 2025 01:29
-
-
Save zzstoatzz/632c3313d4d8853ee00a37f6069f9dbe to your computer and use it in GitHub Desktop.
must be run from github.com/prefecthq/prefect (requires editable install of prefect)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env -S uv run -q --with-editable . | |
| # /// script | |
| # dependencies = ["matplotlib", "networkx", "rich"] | |
| # /// | |
| """Animation showing the Prefect transfer process as a DAG visualization.""" | |
| import argparse | |
| import asyncio | |
| import os | |
| import sys | |
| from unittest.mock import patch | |
| import matplotlib.animation as animation | |
| import matplotlib.pyplot as plt | |
| import networkx as nx | |
| import numpy as np | |
| from matplotlib.patches import Circle, FancyArrowPatch | |
| from rich.console import Console | |
| # No path hacks needed with editable install | |
| console = Console() | |
| plt.style.use("dark_background") | |
| RESOURCE_COLORS = { | |
| "MigratableWorkPool": "#00D9FF", # Bright cyan | |
| "MigratableWorkQueue": "#0066FF", # Bright blue | |
| "MigratableFlow": "#FF6B6B", # Coral red | |
| "MigratableDeployment": "#FFA500", # Orange | |
| "MigratableBlockDocument": "#00FF88", # Bright mint | |
| "MigratableBlockSchema": "#00CC66", # Medium mint | |
| "MigratableBlockType": "#009944", # Dark mint | |
| "MigratableVariable": "#FFD700", # Gold | |
| "MigratableGlobalConcurrencyLimit": "#FF00FF", # Magenta | |
| "MigratableAutomation": "#FF1493", # Deep pink | |
| } | |
| RESOURCE_DISPLAY_NAMES = { | |
| "MigratableWorkPool": "work pools", | |
| "MigratableWorkQueue": "work queues", | |
| "MigratableFlow": "flows", | |
| "MigratableDeployment": "deployments", | |
| "MigratableBlockDocument": "blocks", | |
| "MigratableBlockSchema": "block schemas", | |
| "MigratableBlockType": "block types", | |
| "MigratableVariable": "variables", | |
| "MigratableGlobalConcurrencyLimit": "concurrency limits", | |
| "MigratableAutomation": "automations", | |
| } | |
| CAPTURED_DAG = None | |
| CAPTURED_RESOURCES = None | |
| def ease_in_out_cubic(t): | |
| """Cubic easing function for smooth animation""" | |
| if t < 0.5: | |
| return 4 * t * t * t | |
| else: | |
| return 1 - pow(-2 * t + 2, 3) / 2 | |
| # Precompute easing samples for performance with large DAGs | |
| EASE_SAMPLES = np.array([ease_in_out_cubic(i / 30) for i in range(31)]) | |
| async def capture_dag(source_profile: str, dest_profile: str): | |
| """Run the transfer command but capture the DAG instead of executing""" | |
| global CAPTURED_DAG, CAPTURED_RESOURCES | |
| from prefect.cli.transfer import ( | |
| _execute_transfer, | |
| _find_root_resources, | |
| ) | |
| from prefect.cli.transfer._dag import TransferDAG | |
| from prefect.client.orchestration import get_client | |
| from prefect.settings import load_profile | |
| async def _capture_execute( | |
| self, process_node, max_workers=10, skip_on_failure=True | |
| ): | |
| global CAPTURED_DAG | |
| CAPTURED_DAG = self | |
| console.print(f"[green]captured dag with {len(self.nodes)} nodes[/green]") | |
| return {"completed": len(self.nodes), "failed": 0, "skipped": 0} | |
| load_profile(source_profile) | |
| async with get_client() as source_client: | |
| console.print(f"[cyan]collecting resources from {source_profile}...[/cyan]") | |
| from prefect.cli.transfer._migratable_resources import ( | |
| construct_migratable_resource, | |
| ) | |
| collections = await asyncio.gather( | |
| source_client.read_work_pools(), | |
| source_client.read_work_queues(), | |
| source_client.read_deployments(), | |
| source_client.read_flows(), | |
| source_client.read_block_documents(), | |
| source_client.read_variables(), | |
| source_client.read_global_concurrency_limits(), | |
| source_client.read_automations(), | |
| ) | |
| resources = await asyncio.gather( | |
| *[ | |
| construct_migratable_resource(item) | |
| for collection in collections | |
| for item in collection | |
| ] | |
| ) | |
| CAPTURED_RESOURCES = resources | |
| console.print(f"[green]found {len(resources)} resources[/green]") | |
| roots = await _find_root_resources(resources) | |
| dag = TransferDAG() | |
| await dag.build_from_roots(roots) | |
| load_profile(dest_profile) | |
| console.print("[cyan]Transferring resources...[/cyan]", end="") | |
| console.print(" " * 50, end="\r") | |
| with patch.object(TransferDAG, "execute_concurrent", _capture_execute): | |
| try: | |
| await _execute_transfer(dag, console) | |
| except Exception: | |
| # Expected since we're not actually transferring | |
| pass | |
| return CAPTURED_DAG, CAPTURED_RESOURCES | |
| class TransferAnimation: | |
| def __init__(self, dag, resources, source_profile: str, dest_profile: str): | |
| self.dag = dag | |
| self.resources = list(resources) | |
| self.fig, self.ax = plt.subplots(figsize=(20, 12), facecolor="#0A0A0A") | |
| self.ax.set_xlim(-1, 21) | |
| self.ax.set_ylim(-1, 13) | |
| self.ax.axis("off") | |
| self.source_profile = source_profile | |
| self.dest_profile = dest_profile | |
| self.G = nx.DiGraph() | |
| for node_id, resource in dag.nodes.items(): | |
| self.G.add_node(node_id, resource=resource) | |
| for node_id, deps in dag._dependencies.items(): | |
| for dep_id in deps: | |
| self.G.add_edge(dep_id, node_id) # dep -> node | |
| console.print( | |
| f"[cyan]graph has {len(self.G.nodes)} nodes and {len(self.G.edges)} edges[/cyan]" | |
| ) | |
| # Title | |
| self.title = self.ax.text( | |
| 10, 12, "", fontsize=28, ha="center", fontweight="bold", color="white" | |
| ) | |
| # Command text | |
| self.command_text = self.ax.text( | |
| 10, | |
| 0.2, | |
| "", | |
| fontsize=16, | |
| ha="center", | |
| family="monospace", | |
| color="#4ECDC4", | |
| fontweight="bold", | |
| ) | |
| # Progress indicator | |
| self.progress_text = self.ax.text( | |
| 1, 11, "", fontsize=16, ha="left", style="italic", color="#888" | |
| ) | |
| # Source and destination labels | |
| self.source_label = self.ax.text( | |
| 5, | |
| 11.5, | |
| f"source: {source_profile}", | |
| fontsize=16, | |
| ha="center", | |
| color="#4ECDC4", | |
| alpha=0, | |
| ) | |
| self.dest_label = self.ax.text( | |
| 15, | |
| 11.5, | |
| f"destination: {dest_profile}", | |
| fontsize=16, | |
| ha="center", | |
| color="#FF6B6B", | |
| alpha=0, | |
| ) | |
| self.pos = self._create_dag_layout() | |
| self.resource_counts = {} | |
| for node_id in self.G.nodes(): | |
| resource = self.G.nodes[node_id]["resource"] | |
| cls_name = resource.__class__.__name__ | |
| self.resource_counts[cls_name] = self.resource_counts.get(cls_name, 0) + 1 | |
| self.resource_groups = {} | |
| for node_id in self.G.nodes(): | |
| resource = self.G.nodes[node_id]["resource"] | |
| cls_name = resource.__class__.__name__ | |
| if cls_name not in self.resource_groups: | |
| self.resource_groups[cls_name] = [] | |
| self.resource_groups[cls_name].append(node_id) | |
| # Destination positions - grouped by type with clear labels and backgrounds | |
| self.dest_pos = {} | |
| current_y = 10.5 # Start position | |
| # Sort groups by count for better layout | |
| sorted_groups = sorted(self.resource_groups.items(), key=lambda x: -len(x[1])) | |
| # With many nodes, we need more columns and tighter spacing | |
| total_nodes = len(self.G.nodes()) | |
| if total_nodes > 400: | |
| cols = 30 # MANY columns for huge transfers | |
| node_spacing = 0.18 | |
| row_spacing = 0.18 | |
| elif total_nodes > 200: | |
| cols = 25 # Many columns for large transfers | |
| node_spacing = 0.22 | |
| row_spacing = 0.22 | |
| else: | |
| cols = 15 | |
| node_spacing = 0.35 | |
| row_spacing = 0.35 | |
| for cls_name, nodes in sorted_groups: | |
| if not nodes: | |
| continue | |
| # Calculate how many rows this group needs | |
| rows_needed = (len(nodes) - 1) // cols + 1 | |
| group_height = rows_needed * row_spacing | |
| color = RESOURCE_COLORS.get(cls_name, "#888") | |
| # Add clear, readable type label with background | |
| if RESOURCE_DISPLAY_NAMES.get(cls_name): | |
| # Add a subtle background rectangle for the group | |
| from matplotlib.patches import Rectangle | |
| # Background rect spans the whole group | |
| rect = Rectangle( | |
| (13.3, current_y - group_height - 0.5), | |
| 6.5, | |
| group_height + 0.6, | |
| facecolor=color, | |
| alpha=0.08, | |
| zorder=0, | |
| edgecolor=color, | |
| linewidth=0.5, | |
| ) | |
| self.ax.add_patch(rect) | |
| # Label at the top of the group | |
| label_text = RESOURCE_DISPLAY_NAMES[cls_name].upper() | |
| bbox_props = dict( | |
| boxstyle="round,pad=0.3", facecolor="white", alpha=0.9 | |
| ) | |
| self.ax.text( | |
| 13.5, | |
| current_y, | |
| label_text, | |
| fontsize=17, | |
| fontweight="bold", | |
| ha="left", | |
| color="black", | |
| bbox=bbox_props, | |
| zorder=15, | |
| ) | |
| # Count | |
| count_text = f"{len(nodes)}" | |
| count_bbox = dict(boxstyle="round,pad=0.2", facecolor=color, alpha=0.9) | |
| self.ax.text( | |
| 19.5, | |
| current_y, | |
| count_text, | |
| fontsize=15, | |
| fontweight="bold", | |
| ha="right", | |
| color="white", | |
| bbox=count_bbox, | |
| zorder=15, | |
| ) | |
| # Position nodes in this group | |
| for i, node in enumerate(nodes): | |
| x = 13.8 + (i % cols) * node_spacing | |
| y = current_y - 0.5 - (i // cols) * row_spacing | |
| self.dest_pos[node] = (x, y) | |
| # Move to next group with a gap | |
| current_y = current_y - group_height - 1.0 # 1.0 unit gap between groups | |
| # Don't go off screen | |
| if current_y < 1: | |
| break | |
| # Node and edge visuals | |
| self.node_patches = {} | |
| self.edge_patches = {} | |
| self.ghost_patches = {} # Ghost outlines left behind | |
| self.floating_patches = {} # Actively floating nodes | |
| self.dest_patches = {} # Final destination nodes | |
| # Animation state | |
| self.frame = 0 | |
| self.transfer_queue = [] # Nodes ready to transfer | |
| self.transferring = {} # node_id -> (start_frame, src_pos, dest_pos) mapping | |
| self.completed = set() | |
| try: | |
| # Deterministic topological sort for stable visuals | |
| self.topo_order = list(nx.lexicographical_topological_sort(self.G, key=str)) | |
| except nx.NetworkXError: | |
| self.topo_order = list(self.G.nodes()) | |
| self.node_deps = { | |
| node: set(self.G.predecessors(node)) for node in self.G.nodes() | |
| } | |
| # Legend removed - grouping and labeling is sufficient | |
| def _create_legend(self): | |
| """Create a legend showing resource types""" | |
| legend_elements = [] | |
| for cls_name, count in sorted( | |
| self.resource_counts.items(), key=lambda x: -x[1] | |
| ): | |
| if cls_name in RESOURCE_DISPLAY_NAMES: | |
| color = RESOURCE_COLORS.get(cls_name, "#888") | |
| label = f"{RESOURCE_DISPLAY_NAMES[cls_name]} ({count})" | |
| legend_elements.append(mpatches.Patch(color=color, label=label)) | |
| if legend_elements: | |
| self.ax.legend( | |
| handles=legend_elements, | |
| loc="upper right", | |
| fontsize=8, | |
| framealpha=0.8, | |
| facecolor="#1A1A1A", | |
| ) | |
| def _create_dag_layout(self): | |
| """Create a proper DAG layout""" | |
| if len(self.G.nodes()) == 0: | |
| return {} | |
| # Use hierarchical layout | |
| try: | |
| # Try to use graphviz layout if available | |
| pos = nx.nx_agraph.graphviz_layout(self.G, prog="dot", args="-Grankdir=TB") | |
| # Scale and position | |
| xs = [p[0] for p in pos.values()] | |
| ys = [p[1] for p in pos.values()] | |
| min_x, max_x = min(xs), max(xs) | |
| min_y, max_y = min(ys), max(ys) | |
| # Normalize and scale to left side | |
| for node, (x, y) in pos.items(): | |
| new_x = 1 + ((x - min_x) / (max_x - min_x + 1)) * 7 | |
| new_y = 2 + ((y - min_y) / (max_y - min_y + 1)) * 8 | |
| pos[node] = (new_x, new_y) | |
| return pos | |
| except Exception: | |
| # Fallback to manual hierarchical layout | |
| # Calculate depth of each node | |
| node_depths = {} | |
| for node in nx.topological_sort(self.G): | |
| preds = list(self.G.predecessors(node)) | |
| if not preds: | |
| node_depths[node] = 0 | |
| else: | |
| node_depths[node] = max(node_depths.get(p, 0) for p in preds) + 1 | |
| # Group by depth | |
| depth_groups = {} | |
| for node, depth in node_depths.items(): | |
| if depth not in depth_groups: | |
| depth_groups[depth] = [] | |
| depth_groups[depth].append(node) | |
| # Position nodes | |
| pos = {} | |
| max_depth = max(depth_groups.keys()) if depth_groups else 0 | |
| for depth, nodes in depth_groups.items(): | |
| # Y position based on depth (top to bottom) | |
| if max_depth > 0: | |
| y = 10 - (depth / max_depth) * 7 | |
| else: | |
| y = 6 | |
| # X position: distribute horizontally | |
| x_spacing = min(0.8, 7.0 / max(len(nodes), 1)) | |
| x_start = 4.5 - (len(nodes) * x_spacing) / 2 | |
| for i, node in enumerate(nodes): | |
| x = x_start + i * x_spacing | |
| pos[node] = (x, y) | |
| return pos | |
| def init(self): | |
| """Initialize animation - draw the full DAG on the left""" | |
| for edge in self.G.edges(): | |
| if edge[0] in self.pos and edge[1] in self.pos: | |
| start = self.pos[edge[0]] | |
| end = self.pos[edge[1]] | |
| arrow = FancyArrowPatch( | |
| start, | |
| end, | |
| connectionstyle="arc3,rad=0.1", | |
| arrowstyle="->,head_width=0.08,head_length=0.1", | |
| color="#333", | |
| alpha=0.5, | |
| linewidth=0.8, | |
| zorder=1, | |
| ) | |
| self.ax.add_patch(arrow) | |
| self.edge_patches[edge] = arrow | |
| for node_id, (x, y) in self.pos.items(): | |
| resource = self.G.nodes[node_id]["resource"] | |
| cls_name = resource.__class__.__name__ | |
| color = RESOURCE_COLORS.get(cls_name, "#888") | |
| # STRONG COLOR | |
| circle = Circle( | |
| (x, y), | |
| 0.18, # Bigger | |
| facecolor=color, | |
| edgecolor=color, # Same color edge for stronger look | |
| alpha=1.0, # Full opacity | |
| linewidth=0.5, # Thin edge | |
| zorder=3, | |
| ) | |
| self.ax.add_patch(circle) | |
| self.node_patches[node_id] = circle | |
| return [] | |
| def animate(self, frame_num): | |
| """Animation function""" | |
| # Phase 1: Show title and command | |
| if frame_num < 30: | |
| if frame_num == 0: | |
| self.title.set_text("prefect transfer") | |
| elif frame_num == 10: | |
| self.command_text.set_text( | |
| f"$ prefect transfer --from {self.source_profile} --to {self.dest_profile}" | |
| ) | |
| elif frame_num == 15: | |
| self.source_label.set_alpha(1) | |
| self.dest_label.set_alpha(1) | |
| elif frame_num == 20: | |
| total = len(self.G.nodes()) | |
| self.progress_text.set_text( | |
| f"starting transfer of {total} resources..." | |
| ) | |
| # Phase 2: Transfer animation | |
| else: | |
| transfer_frame = frame_num - 30 | |
| # Check which nodes are ready to transfer (dependencies satisfied) | |
| if transfer_frame % 2 == 0: # Check more frequently for smoother flow | |
| for node in self.topo_order: | |
| if node not in self.completed and node not in self.transferring: | |
| # Check if all dependencies are completed | |
| if all(dep in self.completed for dep in self.node_deps[node]): | |
| # Start transferring this node | |
| if node in self.pos and node in self.dest_pos: | |
| self.transferring[node] = ( | |
| transfer_frame, | |
| self.pos[node], | |
| self.dest_pos[node], | |
| ) | |
| # Create ghost outline at source | |
| x, y = self.pos[node] | |
| resource = self.G.nodes[node]["resource"] | |
| cls_name = resource.__class__.__name__ | |
| color = RESOURCE_COLORS.get(cls_name, "#888") | |
| ghost = Circle( | |
| (x, y), | |
| 0.15, | |
| facecolor="none", | |
| edgecolor=color, | |
| alpha=0.2, | |
| linewidth=1, | |
| linestyle=":", | |
| zorder=2, | |
| ) | |
| self.ax.add_patch(ghost) | |
| self.ghost_patches[node] = ghost | |
| # Hide original node | |
| if node in self.node_patches: | |
| self.node_patches[node].set_visible(False) | |
| # Create floating node | |
| floating = Circle( | |
| (x, y), | |
| 0.18, | |
| facecolor=color, | |
| edgecolor="#FFFFFF", | |
| alpha=1.0, | |
| linewidth=2.5, | |
| zorder=10, | |
| ) | |
| self.ax.add_patch(floating) | |
| self.floating_patches[node] = floating | |
| # Only start a few at a time for cleaner animation | |
| if ( | |
| len( | |
| [ | |
| n | |
| for n, (f, _, _) in self.transferring.items() | |
| if transfer_frame - f < 8 | |
| ] | |
| ) | |
| >= 3 | |
| ): | |
| break | |
| # Animate floating nodes with smooth easing | |
| for node, (start_frame, src_pos, dest_pos) in list( | |
| self.transferring.items() | |
| ): | |
| if node in self.floating_patches: | |
| floating = self.floating_patches[node] | |
| # Calculate smooth progress with easing | |
| linear_progress = min( | |
| 1.0, (transfer_frame - start_frame) / 30.0 | |
| ) # Slower, smoother | |
| # Use precomputed easing samples for performance | |
| progress = EASE_SAMPLES[int(round(linear_progress * 30))] | |
| src_x, src_y = src_pos | |
| dest_x, dest_y = dest_pos | |
| # Get resource info | |
| resource = self.G.nodes[node]["resource"] | |
| cls_name = resource.__class__.__name__ | |
| color = RESOURCE_COLORS.get(cls_name, "#888") | |
| # Smooth curved path | |
| current_x = src_x + (dest_x - src_x) * progress | |
| # Add smooth arc | |
| arc_height = 1.5 * np.sin(linear_progress * np.pi) | |
| current_y = src_y + (dest_y - src_y) * progress + arc_height | |
| floating.center = (current_x, current_y) | |
| # Gentle pulse effect | |
| pulse = 0.18 + 0.01 * np.sin(transfer_frame * 0.3) | |
| floating.radius = pulse | |
| # Fade edge as it travels | |
| edge_alpha = 1.0 - (linear_progress * 0.3) | |
| floating.set_edgecolor((1, 1, 1, edge_alpha)) | |
| # Complete transfer | |
| if linear_progress >= 1.0: | |
| # Remove from transferring | |
| del self.transferring[node] | |
| self.completed.add(node) | |
| # Remove floating node | |
| floating.remove() | |
| del self.floating_patches[node] | |
| # Create final destination node - STRONGLY COLORED | |
| final = Circle( | |
| (dest_x, dest_y), | |
| 0.14, | |
| facecolor=color, | |
| edgecolor=color, | |
| alpha=1.0, # Full opacity for strong color | |
| linewidth=0.5, | |
| zorder=5, | |
| ) | |
| self.ax.add_patch(final) | |
| self.dest_patches[node] = final | |
| total = len(self.G.nodes()) | |
| completed = len(self.completed) | |
| in_progress = len(self.transferring) | |
| if completed < total: | |
| percentage = int(100 * completed / total) | |
| status = f"transferred: {completed:3d}/{total} ({percentage:3d}%)" | |
| if in_progress > 0: | |
| status += f" • in flight: {in_progress}" | |
| self.progress_text.set_text(status) | |
| else: | |
| self.progress_text.set_text(f"✓ all {total} resources transferred") | |
| self.progress_text.set_color("#00FF00") | |
| return [] | |
| async def create_animation( | |
| source_profile: str, | |
| dest_profile: str, | |
| frames: int | None = None, | |
| suffix: str = "", | |
| fps: int = 30, | |
| dpi: int = 100, | |
| fmt: str = "gif", | |
| outdir: str = ".", | |
| ): | |
| """Create and save the animation""" | |
| console.print("[yellow]capturing transfer dag...[/yellow]") | |
| try: | |
| dag, resources = await capture_dag(source_profile, dest_profile) | |
| if dag and resources: | |
| console.print(f"[green]captured dag with {len(dag.nodes)} nodes[/green]") | |
| else: | |
| raise ValueError("failed to capture dag") | |
| except Exception as e: | |
| console.print(f"[red]error capturing dag: {e}[/red]") | |
| return | |
| anim_obj = TransferAnimation(dag, resources, source_profile, dest_profile) | |
| if frames is None: | |
| # Need enough frames for all nodes to transfer | |
| # Each batch of 3 nodes starts every 2 frames, takes 30 frames to complete | |
| # So roughly: (nodes/3) * 2 + 30 for the last batch + 30 for intro | |
| min_frames = (len(dag.nodes) // 3) * 2 + 60 | |
| frames_needed = max(min_frames, len(dag.nodes) * 2 + 100) | |
| console.print( | |
| f"[dim]auto-calculated {frames_needed} frames for {len(dag.nodes)} nodes[/dim]" | |
| ) | |
| else: | |
| frames_needed = frames | |
| min_recommended = (len(dag.nodes) // 3) * 2 + 60 | |
| if frames_needed < min_recommended: | |
| console.print( | |
| f"[yellow]warning: {frames_needed} frames may be too few for {len(dag.nodes)} nodes (recommended: {min_recommended}+)[/yellow]" | |
| ) | |
| console.print(f"[cyan]generating animation with {frames_needed} frames...[/cyan]") | |
| anim = animation.FuncAnimation( | |
| anim_obj.fig, | |
| anim_obj.animate, | |
| init_func=anim_obj.init, | |
| frames=frames_needed, | |
| interval=30, # 30ms between frames for very smooth motion | |
| blit=False, | |
| ) | |
| # Save as GIF | |
| # Save animation | |
| gif_name = os.path.join(outdir, f"transfer_animation{suffix}.gif") | |
| mp4_name = os.path.join(outdir, f"transfer_animation{suffix}.mp4") | |
| if fmt in ("gif", "both"): | |
| console.print(f"[cyan]saving animation to {gif_name}...[/cyan]") | |
| anim.save(gif_name, writer="pillow", fps=fps, dpi=dpi) | |
| console.print(f"[green]✓ animation saved as {gif_name}[/green]") | |
| if fmt in ("mp4", "both"): | |
| try: | |
| console.print(f"[cyan]saving animation to {mp4_name}...[/cyan]") | |
| anim.save(mp4_name, writer="ffmpeg", fps=fps, dpi=dpi, codec="h264") | |
| console.print(f"[green]✓ animation saved as {mp4_name}[/green]") | |
| except Exception: | |
| console.print( | |
| "[yellow]mp4 export requires ffmpeg (e.g., `brew install ffmpeg`)[/yellow]" | |
| ) | |
| def _ensure_headless(): | |
| """Ensure matplotlib uses a non-interactive backend if no display is available.""" | |
| import matplotlib | |
| if not os.environ.get("DISPLAY") and sys.platform != "win32": | |
| matplotlib.use("Agg") | |
| if __name__ == "__main__": | |
| _ensure_headless() | |
| parser = argparse.ArgumentParser(description="Generate Prefect transfer animation") | |
| parser.add_argument("--source", default="pond", help="Source profile") | |
| parser.add_argument("--dest", default="testing", help="Destination profile") | |
| parser.add_argument( | |
| "--frames", type=int, help="Number of frames (default: auto-calculate)" | |
| ) | |
| parser.add_argument( | |
| "--suffix", default="", help="Suffix for output files (e.g., '_test')" | |
| ) | |
| parser.add_argument("--fps", type=int, default=30, help="Frames per second") | |
| parser.add_argument("--dpi", type=int, default=100, help="DPI for output") | |
| parser.add_argument( | |
| "--format", choices=["gif", "mp4", "both"], default="gif", help="Output format" | |
| ) | |
| parser.add_argument("--outdir", default=".", help="Output directory") | |
| args = parser.parse_args() | |
| asyncio.run( | |
| create_animation( | |
| source_profile=args.source, | |
| dest_profile=args.dest, | |
| frames=args.frames, | |
| suffix=args.suffix, | |
| fps=args.fps, | |
| dpi=args.dpi, | |
| fmt=args.format, | |
| outdir=args.outdir, | |
| ) | |
| ) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment