Created
October 10, 2025 17:24
-
-
Save wjkoh/558c20d285c14d8d9cfeb9a636154535 to your computer and use it in GitHub Desktop.
Go: Background Job Manager
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package main | |
| import ( | |
| "bytes" | |
| "context" | |
| "encoding/json" | |
| "errors" | |
| "fmt" | |
| "log/slog" | |
| "sync" | |
| "time" | |
| "github.com/google/uuid" | |
| ) | |
| type Job struct { | |
| ID string `json:"id"` | |
| Status JobStatus `json:"status"` | |
| Error string `json:"error"` | |
| Created time.Time `json:"created"` | |
| Started time.Time `json:"started"` | |
| Finished time.Time `json:"finished"` | |
| Logs []json.RawMessage `json:"logs"` | |
| } | |
| type jobManager struct { | |
| workerWg sync.WaitGroup | |
| mu sync.RWMutex | |
| jobs map[string]*jobInternal | |
| jobQueue chan *jobInternal | |
| } | |
| func NewJobManager(queueSize int) *jobManager { | |
| return &jobManager{ | |
| jobs: make(map[string]*jobInternal), | |
| jobQueue: make(chan *jobInternal, queueSize), | |
| } | |
| } | |
| func (m *jobManager) Start(numWorkers int) error { | |
| if numWorkers <= 0 { | |
| return fmt.Errorf("number of workers must be positive") | |
| } | |
| m.workerWg.Add(numWorkers) | |
| for range numWorkers { | |
| go m.worker() | |
| } | |
| return nil | |
| } | |
| var ErrJobManagerAlreadyStopped = errors.New("job manager already stopped") | |
| func (m *jobManager) Stop() (err error) { | |
| m.mu.Lock() | |
| if m.jobQueue == nil { | |
| m.mu.Unlock() | |
| return ErrJobManagerAlreadyStopped | |
| } | |
| close(m.jobQueue) | |
| m.jobQueue = nil | |
| m.mu.Unlock() | |
| m.workerWg.Wait() | |
| return nil | |
| } | |
| type ExecuteFunc func(context.Context, *slog.Logger) error | |
| // TODO: Add CreateAndWait() if necessary. | |
| func (m *jobManager) Create(execute ExecuteFunc) (id string, err error) { | |
| ctx, cancel := context.WithCancel(context.Background()) | |
| job := &jobInternal{ | |
| ctx: ctx, | |
| cancel: cancel, | |
| id: uuid.NewString(), | |
| status: JobStatusPending, | |
| created: time.Now(), | |
| execute: execute, | |
| } | |
| m.mu.Lock() | |
| defer m.mu.Unlock() | |
| if m.jobQueue == nil { | |
| return "", ErrJobManagerAlreadyStopped | |
| } | |
| select { | |
| case m.jobQueue <- job: | |
| m.jobs[job.id] = job | |
| return job.id, nil | |
| default: | |
| cancel() | |
| return "", ErrJobQueueFull | |
| } | |
| } | |
| func (m *jobManager) Get(id string) (*Job, error) { | |
| m.mu.RLock() | |
| job, ok := m.jobs[id] | |
| m.mu.RUnlock() | |
| if !ok { | |
| return nil, ErrJobNotFound | |
| } | |
| return job.toJob(), nil | |
| } | |
| func (m *jobManager) Delete(id string) error { | |
| m.mu.Lock() | |
| job, ok := m.jobs[id] | |
| if ok { | |
| delete(m.jobs, id) | |
| } | |
| m.mu.Unlock() | |
| if !ok { | |
| return ErrJobNotFound | |
| } | |
| job.cancel() | |
| return nil | |
| } | |
| func (m *jobManager) worker() { | |
| defer m.workerWg.Done() | |
| m.mu.RLock() | |
| if m.jobQueue == nil { | |
| m.mu.RUnlock() | |
| return | |
| } | |
| queue := m.jobQueue | |
| m.mu.RUnlock() | |
| for job := range queue { | |
| m.execute(job) | |
| job.cancel() | |
| } | |
| } | |
| func (m *jobManager) execute(job *jobInternal) { | |
| logger := slog.New(slog.NewJSONHandler(&logWriter{job: job}, nil)).With("jobId", job.id) | |
| logger.LogAttrs(job.ctx, slog.LevelInfo, "Job started") | |
| job.mu.Lock() | |
| job.started = time.Now() | |
| job.status = JobStatusRunning | |
| job.mu.Unlock() | |
| var err error | |
| defer func() { | |
| if r := recover(); r != nil { | |
| err = fmt.Errorf("panic: %v", r) | |
| } | |
| job.mu.Lock() | |
| job.finished = time.Now() | |
| if err == nil { | |
| job.status = JobStatusCompleted | |
| } else if errors.Is(err, context.Canceled) { | |
| job.status = JobStatusCanceled | |
| job.error = err | |
| } else { | |
| job.status = JobStatusFailed | |
| job.error = err | |
| } | |
| job.mu.Unlock() | |
| logger.LogAttrs(job.ctx, slog.LevelInfo, "Job finished") | |
| }() | |
| err = job.ctx.Err() | |
| if err != nil { | |
| return | |
| } | |
| err = job.execute(job.ctx, logger) | |
| } | |
| type jobInternal struct { | |
| ctx context.Context | |
| cancel context.CancelFunc | |
| id string | |
| execute ExecuteFunc | |
| mu sync.RWMutex | |
| status JobStatus | |
| error error | |
| created time.Time | |
| started time.Time | |
| finished time.Time | |
| logs []json.RawMessage | |
| } | |
| func (j *jobInternal) toJob() *Job { | |
| j.mu.RLock() | |
| defer j.mu.RUnlock() | |
| var errMsg string | |
| if j.error != nil { | |
| errMsg = j.error.Error() | |
| } | |
| logsCopy := make([]json.RawMessage, len(j.logs)) | |
| copy(logsCopy, j.logs) | |
| return &Job{ | |
| ID: j.id, | |
| Status: j.status, | |
| Error: errMsg, | |
| Created: j.created, | |
| Started: j.started, | |
| Finished: j.finished, | |
| Logs: logsCopy, | |
| } | |
| } | |
| type logWriter struct { | |
| job *jobInternal | |
| } | |
| func (w *logWriter) Write(p []byte) (int, error) { | |
| line := make([]byte, len(p)) | |
| copy(line, p) | |
| log := json.RawMessage(bytes.TrimSpace(line)) | |
| w.job.mu.Lock() | |
| defer w.job.mu.Unlock() | |
| w.job.logs = append(w.job.logs, log) | |
| return len(p), nil | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment