Skip to content

Instantly share code, notes, and snippets.

@wjkoh
Created October 10, 2025 17:24
Show Gist options
  • Select an option

  • Save wjkoh/558c20d285c14d8d9cfeb9a636154535 to your computer and use it in GitHub Desktop.

Select an option

Save wjkoh/558c20d285c14d8d9cfeb9a636154535 to your computer and use it in GitHub Desktop.
Go: Background Job Manager
package main
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"sync"
"time"
"github.com/google/uuid"
)
type Job struct {
ID string `json:"id"`
Status JobStatus `json:"status"`
Error string `json:"error"`
Created time.Time `json:"created"`
Started time.Time `json:"started"`
Finished time.Time `json:"finished"`
Logs []json.RawMessage `json:"logs"`
}
type jobManager struct {
workerWg sync.WaitGroup
mu sync.RWMutex
jobs map[string]*jobInternal
jobQueue chan *jobInternal
}
func NewJobManager(queueSize int) *jobManager {
return &jobManager{
jobs: make(map[string]*jobInternal),
jobQueue: make(chan *jobInternal, queueSize),
}
}
func (m *jobManager) Start(numWorkers int) error {
if numWorkers <= 0 {
return fmt.Errorf("number of workers must be positive")
}
m.workerWg.Add(numWorkers)
for range numWorkers {
go m.worker()
}
return nil
}
var ErrJobManagerAlreadyStopped = errors.New("job manager already stopped")
func (m *jobManager) Stop() (err error) {
m.mu.Lock()
if m.jobQueue == nil {
m.mu.Unlock()
return ErrJobManagerAlreadyStopped
}
close(m.jobQueue)
m.jobQueue = nil
m.mu.Unlock()
m.workerWg.Wait()
return nil
}
type ExecuteFunc func(context.Context, *slog.Logger) error
// TODO: Add CreateAndWait() if necessary.
func (m *jobManager) Create(execute ExecuteFunc) (id string, err error) {
ctx, cancel := context.WithCancel(context.Background())
job := &jobInternal{
ctx: ctx,
cancel: cancel,
id: uuid.NewString(),
status: JobStatusPending,
created: time.Now(),
execute: execute,
}
m.mu.Lock()
defer m.mu.Unlock()
if m.jobQueue == nil {
return "", ErrJobManagerAlreadyStopped
}
select {
case m.jobQueue <- job:
m.jobs[job.id] = job
return job.id, nil
default:
cancel()
return "", ErrJobQueueFull
}
}
func (m *jobManager) Get(id string) (*Job, error) {
m.mu.RLock()
job, ok := m.jobs[id]
m.mu.RUnlock()
if !ok {
return nil, ErrJobNotFound
}
return job.toJob(), nil
}
func (m *jobManager) Delete(id string) error {
m.mu.Lock()
job, ok := m.jobs[id]
if ok {
delete(m.jobs, id)
}
m.mu.Unlock()
if !ok {
return ErrJobNotFound
}
job.cancel()
return nil
}
func (m *jobManager) worker() {
defer m.workerWg.Done()
m.mu.RLock()
if m.jobQueue == nil {
m.mu.RUnlock()
return
}
queue := m.jobQueue
m.mu.RUnlock()
for job := range queue {
m.execute(job)
job.cancel()
}
}
func (m *jobManager) execute(job *jobInternal) {
logger := slog.New(slog.NewJSONHandler(&logWriter{job: job}, nil)).With("jobId", job.id)
logger.LogAttrs(job.ctx, slog.LevelInfo, "Job started")
job.mu.Lock()
job.started = time.Now()
job.status = JobStatusRunning
job.mu.Unlock()
var err error
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panic: %v", r)
}
job.mu.Lock()
job.finished = time.Now()
if err == nil {
job.status = JobStatusCompleted
} else if errors.Is(err, context.Canceled) {
job.status = JobStatusCanceled
job.error = err
} else {
job.status = JobStatusFailed
job.error = err
}
job.mu.Unlock()
logger.LogAttrs(job.ctx, slog.LevelInfo, "Job finished")
}()
err = job.ctx.Err()
if err != nil {
return
}
err = job.execute(job.ctx, logger)
}
type jobInternal struct {
ctx context.Context
cancel context.CancelFunc
id string
execute ExecuteFunc
mu sync.RWMutex
status JobStatus
error error
created time.Time
started time.Time
finished time.Time
logs []json.RawMessage
}
func (j *jobInternal) toJob() *Job {
j.mu.RLock()
defer j.mu.RUnlock()
var errMsg string
if j.error != nil {
errMsg = j.error.Error()
}
logsCopy := make([]json.RawMessage, len(j.logs))
copy(logsCopy, j.logs)
return &Job{
ID: j.id,
Status: j.status,
Error: errMsg,
Created: j.created,
Started: j.started,
Finished: j.finished,
Logs: logsCopy,
}
}
type logWriter struct {
job *jobInternal
}
func (w *logWriter) Write(p []byte) (int, error) {
line := make([]byte, len(p))
copy(line, p)
log := json.RawMessage(bytes.TrimSpace(line))
w.job.mu.Lock()
defer w.job.mu.Unlock()
w.job.logs = append(w.job.logs, log)
return len(p), nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment