Skip to content

Instantly share code, notes, and snippets.

@arockwell
Created July 13, 2025 07:27
Show Gist options
  • Select an option

  • Save arockwell/e4d8bf13159fd617f3661afd5c048fc2 to your computer and use it in GitHub Desktop.

Select an option

Save arockwell/e4d8bf13159fd617f3661afd5c048fc2 to your computer and use it in GitHub Desktop.
EMDX Background Jobs: Complete Implementation Guide - Architecture for async job execution with integrated log viewer

Building Background Execution + Log Viewer into EMDX

Implementation Roadmap

Phase 1: Core Infrastructure

1.1 Job Management System

# emdx/models/jobs.py
from dataclasses import dataclass
from datetime import datetime
from typing import Optional, List, Dict, Any
import uuid

@dataclass
class Job:
    id: str
    doc_id: int
    title: str
    type: str  # 'claude', 'git', 'monitor', 'custom'
    status: str  # 'queued', 'running', 'completed', 'failed'
    command: str
    log_file: str
    created_at: datetime
    started_at: Optional[datetime] = None
    completed_at: Optional[datetime] = None
    exit_code: Optional[int] = None
    metadata: Dict[str, Any] = None
    
    @classmethod
    def create(cls, doc_id: int, title: str, type: str, command: str) -> 'Job':
        job_id = f"{type}-{doc_id}-{uuid.uuid4().hex[:8]}"
        return cls(
            id=job_id,
            doc_id=doc_id,
            title=title,
            type=type,
            status='queued',
            command=command,
            log_file=f"~/.config/emdx/logs/{job_id}.log",
            created_at=datetime.now(),
            metadata={}
        )

1.2 Database Schema for Jobs

-- emdx/database/migrations.py - Add to migrations
CREATE TABLE IF NOT EXISTS jobs (
    id TEXT PRIMARY KEY,
    doc_id INTEGER NOT NULL,
    title TEXT NOT NULL,
    type TEXT NOT NULL,
    status TEXT NOT NULL,
    command TEXT NOT NULL,
    log_file TEXT NOT NULL,
    created_at TIMESTAMP NOT NULL,
    started_at TIMESTAMP,
    completed_at TIMESTAMP,
    exit_code INTEGER,
    metadata JSON,
    FOREIGN KEY (doc_id) REFERENCES documents(id)
);

CREATE INDEX idx_jobs_status ON jobs(status);
CREATE INDEX idx_jobs_doc_id ON jobs(doc_id);
CREATE INDEX idx_jobs_created_at ON jobs(created_at);

1.3 Job Runner Service

# emdx/services/job_runner.py
import asyncio
import subprocess
from pathlib import Path
import json
from typing import Optional
from emdx.models.jobs import Job
from emdx.database import db

class JobRunner:
    def __init__(self):
        self.running_jobs: Dict[str, asyncio.subprocess.Process] = {}
        
    async def execute_job(self, job: Job) -> None:
        """Execute a job asynchronously with full logging."""
        # Update job status
        job.status = 'running'
        job.started_at = datetime.now()
        self._update_job_in_db(job)
        
        # Ensure log directory exists
        log_path = Path(job.log_file).expanduser()
        log_path.parent.mkdir(parents=True, exist_ok=True)
        
        try:
            # Create process with stdout/stderr redirected to log file
            with open(log_path, 'w') as log_file:
                # Write job header
                log_file.write(f"=== EMDX Job: {job.title} ===
")
                log_file.write(f"ID: {job.id}
")
                log_file.write(f"Type: {job.type}
")
                log_file.write(f"Started: {job.started_at}
")
                log_file.write(f"Command: {job.command}
")
                log_file.write("=" * 50 + "

")
                log_file.flush()
                
                # Execute command
                process = await asyncio.create_subprocess_shell(
                    job.command,
                    stdout=log_file,
                    stderr=asyncio.subprocess.STDOUT,
                    cwd=self._get_working_directory(job)
                )
                
                self.running_jobs[job.id] = process
                
                # Wait for completion
                exit_code = await process.wait()
                
                # Write footer
                log_file.write(f"

=== Job completed with exit code: {exit_code} ===
")
                
            # Update job status
            job.status = 'completed' if exit_code == 0 else 'failed'
            job.completed_at = datetime.now()
            job.exit_code = exit_code
            self._update_job_in_db(job)
            
        except Exception as e:
            # Log error and update status
            with open(log_path, 'a') as log_file:
                log_file.write(f"

ERROR: {str(e)}
")
            
            job.status = 'failed'
            job.completed_at = datetime.now()
            job.exit_code = -1
            self._update_job_in_db(job)
            
        finally:
            self.running_jobs.pop(job.id, None)
    
    def kill_job(self, job_id: str) -> bool:
        """Kill a running job."""
        if job_id in self.running_jobs:
            self.running_jobs[job_id].terminate()
            return True
        return False

Phase 2: TUI Integration

2.1 Update Browser Key Bindings

# emdx/ui/textual_browser.py - Add to BINDINGS
Binding("c", "claude_execute", "Claude", key_display="c"),
Binding("g", "git_operation", "Git Op", key_display="g"),
Binding("l", "show_logs", "Logs", key_display="l"),
Binding("shift+l", "tail_last_job", "Tail Last", key_display="L"),

2.2 Background Execution Actions

# emdx/ui/textual_browser.py - Add these methods
async def action_claude_execute(self):
    """Execute Claude in background with current document."""
    if not self.current_doc_id:
        self.show_status("No document selected")
        return
        
    from emdx.models.documents import get_document
    from emdx.models.jobs import Job
    from emdx.services.job_runner import JobRunner
    
    doc = get_document(str(self.current_doc_id))
    
    # Create temp file with document
    with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False) as f:
        f.write(f"# {doc['title']}

{doc['content']}")
        temp_path = f.name
    
    # Create job
    command = f"claude-code --file {temp_path} --non-interactive"
    job = Job.create(
        doc_id=self.current_doc_id,
        title=f"Claude: {doc['title']}",
        type='claude',
        command=command
    )
    
    # Save job to database
    save_job(job)
    
    # Execute asynchronously
    runner = JobRunner()
    asyncio.create_task(runner.execute_job(job))
    
    self.show_status(f"Claude execution started → Press 'l' to view logs")

async def action_git_operation(self):
    """Create git branch and commit based on document."""
    if not self.current_doc_id:
        self.show_status("No document selected")
        return
        
    doc = get_document(str(self.current_doc_id))
    
    # Build git command
    branch_name = f"gameplan-{self.current_doc_id}-{doc['title'][:20].lower().replace(' ', '-')}"
    command = f"""
    git checkout -b {branch_name} && \
    git add . && \
    git commit -m 'WIP: {doc['title']}' && \
    echo 'Branch created and changes committed'
    """
    
    job = Job.create(
        doc_id=self.current_doc_id,
        title=f"Git: Create branch for {doc['title']}",
        type='git',
        command=command
    )
    
    save_job(job)
    runner = JobRunner()
    asyncio.create_task(runner.execute_job(job))
    
    self.show_status(f"Git operation started → Press 'l' to view logs")

Phase 3: Log Viewer TUI

3.1 Log Viewer Screen

# emdx/ui/log_viewer.py
from textual.app import ComposeResult
from textual.screen import Screen
from textual.widgets import DataTable, RichLog, Footer, Header
from textual.containers import Horizontal, Vertical
from textual.binding import Binding
import asyncio
from pathlib import Path

class LogViewerScreen(Screen):
    """Interactive log viewer for background jobs."""
    
    BINDINGS = [
        Binding("q", "close", "Close"),
        Binding("j", "cursor_down", "Down"),
        Binding("k", "cursor_up", "Up"),
        Binding("enter", "view_log", "View Log"),
        Binding("f", "follow_log", "Follow/Tail"),
        Binding("d", "jump_to_doc", "View Document"),
        Binding("x", "kill_job", "Kill Job"),
        Binding("r", "refresh", "Refresh"),
        Binding("/", "search", "Search Logs"),
    ]
    
    def compose(self) -> ComposeResult:
        yield Header(show_clock=True)
        with Horizontal():
            # Job list on the left
            with Vertical(id="job-list", classes="box"):
                yield DataTable(id="jobs-table")
            
            # Log viewer on the right
            with Vertical(id="log-viewer", classes="box"):
                yield RichLog(id="log-output", highlight=True, markup=True)
        
        yield Footer()
    
    async def on_mount(self):
        """Initialize the log viewer."""
        # Set up jobs table
        table = self.query_one("#jobs-table", DataTable)
        table.add_columns("Status", "Type", "Title", "Started", "Duration")
        
        # Load jobs
        await self.refresh_jobs()
        
        # Start auto-refresh
        self.set_interval(2.0, self.refresh_jobs)
    
    async def refresh_jobs(self):
        """Refresh the job list."""
        jobs = get_recent_jobs(limit=50)
        table = self.query_one("#jobs-table", DataTable)
        
        # Clear and repopulate
        table.clear()
        
        for job in jobs:
            status_icon = self._get_status_icon(job.status)
            duration = self._format_duration(job)
            
            table.add_row(
                status_icon,
                job.type.upper(),
                job.title[:40],
                job.started_at.strftime('%H:%M:%S') if job.started_at else '-',
                duration
            )
    
    def _get_status_icon(self, status: str) -> str:
        """Get icon for job status."""
        return {
            'queued': '⏳',
            'running': '●',
            'completed': '✓',
            'failed': '✗'
        }.get(status, '?')
    
    async def action_view_log(self):
        """View the selected job's log."""
        table = self.query_one("#jobs-table", DataTable)
        if table.cursor_row is None:
            return
            
        job = self.get_selected_job()
        if not job:
            return
            
        # Load log file
        log_widget = self.query_one("#log-output", RichLog)
        log_widget.clear()
        
        log_path = Path(job.log_file).expanduser()
        if log_path.exists():
            with open(log_path, 'r') as f:
                log_widget.write(f.read())
        else:
            log_widget.write("[red]Log file not found[/red]")
    
    async def action_follow_log(self):
        """Tail the selected job's log."""
        job = self.get_selected_job()
        if not job or job.status \!= 'running':
            return
            
        # Start tailing
        self.tail_task = asyncio.create_task(self._tail_log(job))
    
    async def _tail_log(self, job: Job):
        """Tail a log file asynchronously."""
        log_widget = self.query_one("#log-output", RichLog)
        log_path = Path(job.log_file).expanduser()
        
        # Use asyncio to read file
        last_pos = 0
        while job.status == 'running':
            if log_path.exists():
                with open(log_path, 'r') as f:
                    f.seek(last_pos)
                    new_content = f.read()
                    if new_content:
                        log_widget.write(new_content)
                    last_pos = f.tell()
            
            await asyncio.sleep(0.5)

Phase 4: Advanced Features

4.1 Job Templates

# emdx/models/job_templates.py
class JobTemplate:
    """Predefined job templates based on document tags."""
    
    @staticmethod
    def get_template(doc: dict, tags: list[str]) -> Optional[str]:
        """Get appropriate job template based on tags."""
        if '🎯' in tags:  # Gameplan
            return f"""
            echo 'Executing gameplan: {doc['title']}'
            claude-code --execute-plan <(echo '{doc['content']}')
            """
        elif '🧪' in tags:  # Test
            return f"""
            echo 'Running tests for: {doc['title']}'
            pytest -v
            """
        elif '🐛' in tags:  # Bug
            return f"""
            echo 'Investigating bug: {doc['title']}'
            git log --oneline -10
            git status
            """

4.2 Log Analysis

# emdx/services/log_analyzer.py
import re
from typing import List, Dict, Any

class LogAnalyzer:
    """Analyze job logs for patterns and insights."""
    
    def analyze_claude_log(self, log_content: str) -> Dict[str, Any]:
        """Extract Claude execution insights."""
        return {
            'files_created': re.findall(r'Created: (.+)$', log_content, re.M),
            'files_modified': re.findall(r'Modified: (.+)$', log_content, re.M),
            'commands_run': re.findall(r'\$ (.+)$', log_content, re.M),
            'errors': re.findall(r'ERROR: (.+)$', log_content, re.M),
            'warnings': re.findall(r'WARNING: (.+)$', log_content, re.M),
        }
    
    def generate_summary(self, job: Job) -> str:
        """Generate a summary of job execution."""
        log_path = Path(job.log_file).expanduser()
        if not log_path.exists():
            return "No log file found"
            
        with open(log_path, 'r') as f:
            content = f.read()
            
        if job.type == 'claude':
            analysis = self.analyze_claude_log(content)
            return f"""
            Files created: {len(analysis['files_created'])}
            Files modified: {len(analysis['files_modified'])}
            Commands run: {len(analysis['commands_run'])}
            Errors: {len(analysis['errors'])}
            """

Phase 5: Integration Points

5.1 Web Dashboard

# emdx/web/job_dashboard.py
from flask import Flask, render_template, jsonify
import json

app = Flask(__name__)

@app.route('/jobs')
def job_dashboard():
    """Web dashboard for job monitoring."""
    return render_template('jobs.html')

@app.route('/api/jobs')
def api_jobs():
    """API endpoint for job data."""
    jobs = get_recent_jobs(limit=100)
    return jsonify([job.to_dict() for job in jobs])

@app.route('/api/jobs/<job_id>/log')
def api_job_log(job_id):
    """Stream job log via API."""
    job = get_job(job_id)
    if not job:
        return jsonify({'error': 'Job not found'}), 404
        
    log_path = Path(job.log_file).expanduser()
    if log_path.exists():
        with open(log_path, 'r') as f:
            return jsonify({'content': f.read()})
    return jsonify({'content': ''})

5.2 Notification System

# emdx/services/notifications.py
import subprocess
from typing import Optional

class JobNotifier:
    """Send notifications for job events."""
    
    def notify_completion(self, job: Job):
        """Notify when a job completes."""
        if job.status == 'completed':
            title = "✅ Job Completed"
            message = f"{job.title} finished successfully"
        else:
            title = "❌ Job Failed"
            message = f"{job.title} failed with exit code {job.exit_code}"
        
        # macOS notification
        subprocess.run([
            'osascript', '-e',
            f'display notification "{message}" with title "{title}"'
        ])
        
        # Terminal bell
        print('�')

Phase 6: CLI Commands

6.1 Job Management Commands

# emdx/commands/jobs.py
import typer
from rich.console import Console
from rich.table import Table

app = typer.Typer()
console = Console()

@app.command()
def jobs(
    status: Optional[str] = typer.Option(None, "--status", "-s"),
    limit: int = typer.Option(20, "--limit", "-n"),
    follow: Optional[str] = typer.Option(None, "--follow", "-f"),
):
    """List and manage background jobs."""
    if follow:
        # Follow specific job log
        follow_job_log(follow)
        return
        
    # List jobs
    jobs = get_recent_jobs(status=status, limit=limit)
    
    table = Table(title="EMDX Background Jobs")
    table.add_column("ID", style="cyan")
    table.add_column("Status", style="green")
    table.add_column("Type", style="yellow")
    table.add_column("Title", style="white")
    table.add_column("Duration", style="magenta")
    
    for job in jobs:
        table.add_row(
            job.id[:8],
            _get_status_display(job.status),
            job.type,
            job.title[:40],
            _format_duration(job)
        )
    
    console.print(table)

@app.command()
def kill(job_id: str):
    """Kill a running job."""
    runner = JobRunner()
    if runner.kill_job(job_id):
        console.print(f"[green]✅ Killed job {job_id}[/green]")
    else:
        console.print(f"[red]❌ Job {job_id} not found or not running[/red]")

@app.command()
def logs(job_id: str, tail: bool = typer.Option(False, "--tail", "-f")):
    """View job logs."""
    job = get_job(job_id)
    if not job:
        console.print("[red]Job not found[/red]")
        return
        
    if tail and job.status == 'running':
        # Live tail
        tail_job_log(job_id)
    else:
        # Dump log
        log_path = Path(job.log_file).expanduser()
        if log_path.exists():
            with open(log_path, 'r') as f:
                console.print(f.read())

Summary

This implementation provides:

  1. Async job execution with full logging
  2. Database persistence for job history
  3. Interactive TUI for monitoring
  4. CLI commands for management
  5. Web dashboard option
  6. Notifications on completion
  7. Log analysis capabilities
  8. Template system for common operations

The key is that everything runs in the background, keeping your workspace clean while providing full visibility into what's happening!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment