# emdx/models/jobs.py
from dataclasses import dataclass
from datetime import datetime
from typing import Optional, List, Dict, Any
import uuid
@dataclass
class Job:
id: str
doc_id: int
title: str
type: str # 'claude', 'git', 'monitor', 'custom'
status: str # 'queued', 'running', 'completed', 'failed'
command: str
log_file: str
created_at: datetime
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
exit_code: Optional[int] = None
metadata: Dict[str, Any] = None
@classmethod
def create(cls, doc_id: int, title: str, type: str, command: str) -> 'Job':
job_id = f"{type}-{doc_id}-{uuid.uuid4().hex[:8]}"
return cls(
id=job_id,
doc_id=doc_id,
title=title,
type=type,
status='queued',
command=command,
log_file=f"~/.config/emdx/logs/{job_id}.log",
created_at=datetime.now(),
metadata={}
)-- emdx/database/migrations.py - Add to migrations
CREATE TABLE IF NOT EXISTS jobs (
id TEXT PRIMARY KEY,
doc_id INTEGER NOT NULL,
title TEXT NOT NULL,
type TEXT NOT NULL,
status TEXT NOT NULL,
command TEXT NOT NULL,
log_file TEXT NOT NULL,
created_at TIMESTAMP NOT NULL,
started_at TIMESTAMP,
completed_at TIMESTAMP,
exit_code INTEGER,
metadata JSON,
FOREIGN KEY (doc_id) REFERENCES documents(id)
);
CREATE INDEX idx_jobs_status ON jobs(status);
CREATE INDEX idx_jobs_doc_id ON jobs(doc_id);
CREATE INDEX idx_jobs_created_at ON jobs(created_at);# emdx/services/job_runner.py
import asyncio
import subprocess
from pathlib import Path
import json
from typing import Optional
from emdx.models.jobs import Job
from emdx.database import db
class JobRunner:
def __init__(self):
self.running_jobs: Dict[str, asyncio.subprocess.Process] = {}
async def execute_job(self, job: Job) -> None:
"""Execute a job asynchronously with full logging."""
# Update job status
job.status = 'running'
job.started_at = datetime.now()
self._update_job_in_db(job)
# Ensure log directory exists
log_path = Path(job.log_file).expanduser()
log_path.parent.mkdir(parents=True, exist_ok=True)
try:
# Create process with stdout/stderr redirected to log file
with open(log_path, 'w') as log_file:
# Write job header
log_file.write(f"=== EMDX Job: {job.title} ===
")
log_file.write(f"ID: {job.id}
")
log_file.write(f"Type: {job.type}
")
log_file.write(f"Started: {job.started_at}
")
log_file.write(f"Command: {job.command}
")
log_file.write("=" * 50 + "
")
log_file.flush()
# Execute command
process = await asyncio.create_subprocess_shell(
job.command,
stdout=log_file,
stderr=asyncio.subprocess.STDOUT,
cwd=self._get_working_directory(job)
)
self.running_jobs[job.id] = process
# Wait for completion
exit_code = await process.wait()
# Write footer
log_file.write(f"
=== Job completed with exit code: {exit_code} ===
")
# Update job status
job.status = 'completed' if exit_code == 0 else 'failed'
job.completed_at = datetime.now()
job.exit_code = exit_code
self._update_job_in_db(job)
except Exception as e:
# Log error and update status
with open(log_path, 'a') as log_file:
log_file.write(f"
ERROR: {str(e)}
")
job.status = 'failed'
job.completed_at = datetime.now()
job.exit_code = -1
self._update_job_in_db(job)
finally:
self.running_jobs.pop(job.id, None)
def kill_job(self, job_id: str) -> bool:
"""Kill a running job."""
if job_id in self.running_jobs:
self.running_jobs[job_id].terminate()
return True
return False# emdx/ui/textual_browser.py - Add to BINDINGS
Binding("c", "claude_execute", "Claude", key_display="c"),
Binding("g", "git_operation", "Git Op", key_display="g"),
Binding("l", "show_logs", "Logs", key_display="l"),
Binding("shift+l", "tail_last_job", "Tail Last", key_display="L"),# emdx/ui/textual_browser.py - Add these methods
async def action_claude_execute(self):
"""Execute Claude in background with current document."""
if not self.current_doc_id:
self.show_status("No document selected")
return
from emdx.models.documents import get_document
from emdx.models.jobs import Job
from emdx.services.job_runner import JobRunner
doc = get_document(str(self.current_doc_id))
# Create temp file with document
with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False) as f:
f.write(f"# {doc['title']}
{doc['content']}")
temp_path = f.name
# Create job
command = f"claude-code --file {temp_path} --non-interactive"
job = Job.create(
doc_id=self.current_doc_id,
title=f"Claude: {doc['title']}",
type='claude',
command=command
)
# Save job to database
save_job(job)
# Execute asynchronously
runner = JobRunner()
asyncio.create_task(runner.execute_job(job))
self.show_status(f"Claude execution started → Press 'l' to view logs")
async def action_git_operation(self):
"""Create git branch and commit based on document."""
if not self.current_doc_id:
self.show_status("No document selected")
return
doc = get_document(str(self.current_doc_id))
# Build git command
branch_name = f"gameplan-{self.current_doc_id}-{doc['title'][:20].lower().replace(' ', '-')}"
command = f"""
git checkout -b {branch_name} && \
git add . && \
git commit -m 'WIP: {doc['title']}' && \
echo 'Branch created and changes committed'
"""
job = Job.create(
doc_id=self.current_doc_id,
title=f"Git: Create branch for {doc['title']}",
type='git',
command=command
)
save_job(job)
runner = JobRunner()
asyncio.create_task(runner.execute_job(job))
self.show_status(f"Git operation started → Press 'l' to view logs")# emdx/ui/log_viewer.py
from textual.app import ComposeResult
from textual.screen import Screen
from textual.widgets import DataTable, RichLog, Footer, Header
from textual.containers import Horizontal, Vertical
from textual.binding import Binding
import asyncio
from pathlib import Path
class LogViewerScreen(Screen):
"""Interactive log viewer for background jobs."""
BINDINGS = [
Binding("q", "close", "Close"),
Binding("j", "cursor_down", "Down"),
Binding("k", "cursor_up", "Up"),
Binding("enter", "view_log", "View Log"),
Binding("f", "follow_log", "Follow/Tail"),
Binding("d", "jump_to_doc", "View Document"),
Binding("x", "kill_job", "Kill Job"),
Binding("r", "refresh", "Refresh"),
Binding("/", "search", "Search Logs"),
]
def compose(self) -> ComposeResult:
yield Header(show_clock=True)
with Horizontal():
# Job list on the left
with Vertical(id="job-list", classes="box"):
yield DataTable(id="jobs-table")
# Log viewer on the right
with Vertical(id="log-viewer", classes="box"):
yield RichLog(id="log-output", highlight=True, markup=True)
yield Footer()
async def on_mount(self):
"""Initialize the log viewer."""
# Set up jobs table
table = self.query_one("#jobs-table", DataTable)
table.add_columns("Status", "Type", "Title", "Started", "Duration")
# Load jobs
await self.refresh_jobs()
# Start auto-refresh
self.set_interval(2.0, self.refresh_jobs)
async def refresh_jobs(self):
"""Refresh the job list."""
jobs = get_recent_jobs(limit=50)
table = self.query_one("#jobs-table", DataTable)
# Clear and repopulate
table.clear()
for job in jobs:
status_icon = self._get_status_icon(job.status)
duration = self._format_duration(job)
table.add_row(
status_icon,
job.type.upper(),
job.title[:40],
job.started_at.strftime('%H:%M:%S') if job.started_at else '-',
duration
)
def _get_status_icon(self, status: str) -> str:
"""Get icon for job status."""
return {
'queued': '⏳',
'running': '●',
'completed': '✓',
'failed': '✗'
}.get(status, '?')
async def action_view_log(self):
"""View the selected job's log."""
table = self.query_one("#jobs-table", DataTable)
if table.cursor_row is None:
return
job = self.get_selected_job()
if not job:
return
# Load log file
log_widget = self.query_one("#log-output", RichLog)
log_widget.clear()
log_path = Path(job.log_file).expanduser()
if log_path.exists():
with open(log_path, 'r') as f:
log_widget.write(f.read())
else:
log_widget.write("[red]Log file not found[/red]")
async def action_follow_log(self):
"""Tail the selected job's log."""
job = self.get_selected_job()
if not job or job.status \!= 'running':
return
# Start tailing
self.tail_task = asyncio.create_task(self._tail_log(job))
async def _tail_log(self, job: Job):
"""Tail a log file asynchronously."""
log_widget = self.query_one("#log-output", RichLog)
log_path = Path(job.log_file).expanduser()
# Use asyncio to read file
last_pos = 0
while job.status == 'running':
if log_path.exists():
with open(log_path, 'r') as f:
f.seek(last_pos)
new_content = f.read()
if new_content:
log_widget.write(new_content)
last_pos = f.tell()
await asyncio.sleep(0.5)# emdx/models/job_templates.py
class JobTemplate:
"""Predefined job templates based on document tags."""
@staticmethod
def get_template(doc: dict, tags: list[str]) -> Optional[str]:
"""Get appropriate job template based on tags."""
if '🎯' in tags: # Gameplan
return f"""
echo 'Executing gameplan: {doc['title']}'
claude-code --execute-plan <(echo '{doc['content']}')
"""
elif '🧪' in tags: # Test
return f"""
echo 'Running tests for: {doc['title']}'
pytest -v
"""
elif '🐛' in tags: # Bug
return f"""
echo 'Investigating bug: {doc['title']}'
git log --oneline -10
git status
"""# emdx/services/log_analyzer.py
import re
from typing import List, Dict, Any
class LogAnalyzer:
"""Analyze job logs for patterns and insights."""
def analyze_claude_log(self, log_content: str) -> Dict[str, Any]:
"""Extract Claude execution insights."""
return {
'files_created': re.findall(r'Created: (.+)$', log_content, re.M),
'files_modified': re.findall(r'Modified: (.+)$', log_content, re.M),
'commands_run': re.findall(r'\$ (.+)$', log_content, re.M),
'errors': re.findall(r'ERROR: (.+)$', log_content, re.M),
'warnings': re.findall(r'WARNING: (.+)$', log_content, re.M),
}
def generate_summary(self, job: Job) -> str:
"""Generate a summary of job execution."""
log_path = Path(job.log_file).expanduser()
if not log_path.exists():
return "No log file found"
with open(log_path, 'r') as f:
content = f.read()
if job.type == 'claude':
analysis = self.analyze_claude_log(content)
return f"""
Files created: {len(analysis['files_created'])}
Files modified: {len(analysis['files_modified'])}
Commands run: {len(analysis['commands_run'])}
Errors: {len(analysis['errors'])}
"""# emdx/web/job_dashboard.py
from flask import Flask, render_template, jsonify
import json
app = Flask(__name__)
@app.route('/jobs')
def job_dashboard():
"""Web dashboard for job monitoring."""
return render_template('jobs.html')
@app.route('/api/jobs')
def api_jobs():
"""API endpoint for job data."""
jobs = get_recent_jobs(limit=100)
return jsonify([job.to_dict() for job in jobs])
@app.route('/api/jobs/<job_id>/log')
def api_job_log(job_id):
"""Stream job log via API."""
job = get_job(job_id)
if not job:
return jsonify({'error': 'Job not found'}), 404
log_path = Path(job.log_file).expanduser()
if log_path.exists():
with open(log_path, 'r') as f:
return jsonify({'content': f.read()})
return jsonify({'content': ''})# emdx/services/notifications.py
import subprocess
from typing import Optional
class JobNotifier:
"""Send notifications for job events."""
def notify_completion(self, job: Job):
"""Notify when a job completes."""
if job.status == 'completed':
title = "✅ Job Completed"
message = f"{job.title} finished successfully"
else:
title = "❌ Job Failed"
message = f"{job.title} failed with exit code {job.exit_code}"
# macOS notification
subprocess.run([
'osascript', '-e',
f'display notification "{message}" with title "{title}"'
])
# Terminal bell
print('�')# emdx/commands/jobs.py
import typer
from rich.console import Console
from rich.table import Table
app = typer.Typer()
console = Console()
@app.command()
def jobs(
status: Optional[str] = typer.Option(None, "--status", "-s"),
limit: int = typer.Option(20, "--limit", "-n"),
follow: Optional[str] = typer.Option(None, "--follow", "-f"),
):
"""List and manage background jobs."""
if follow:
# Follow specific job log
follow_job_log(follow)
return
# List jobs
jobs = get_recent_jobs(status=status, limit=limit)
table = Table(title="EMDX Background Jobs")
table.add_column("ID", style="cyan")
table.add_column("Status", style="green")
table.add_column("Type", style="yellow")
table.add_column("Title", style="white")
table.add_column("Duration", style="magenta")
for job in jobs:
table.add_row(
job.id[:8],
_get_status_display(job.status),
job.type,
job.title[:40],
_format_duration(job)
)
console.print(table)
@app.command()
def kill(job_id: str):
"""Kill a running job."""
runner = JobRunner()
if runner.kill_job(job_id):
console.print(f"[green]✅ Killed job {job_id}[/green]")
else:
console.print(f"[red]❌ Job {job_id} not found or not running[/red]")
@app.command()
def logs(job_id: str, tail: bool = typer.Option(False, "--tail", "-f")):
"""View job logs."""
job = get_job(job_id)
if not job:
console.print("[red]Job not found[/red]")
return
if tail and job.status == 'running':
# Live tail
tail_job_log(job_id)
else:
# Dump log
log_path = Path(job.log_file).expanduser()
if log_path.exists():
with open(log_path, 'r') as f:
console.print(f.read())This implementation provides:
- Async job execution with full logging
- Database persistence for job history
- Interactive TUI for monitoring
- CLI commands for management
- Web dashboard option
- Notifications on completion
- Log analysis capabilities
- Template system for common operations
The key is that everything runs in the background, keeping your workspace clean while providing full visibility into what's happening!