Created
January 27, 2026 08:17
-
-
Save laphilosophia/3e11aadf99c0922f7888be7fe29306f2 to your computer and use it in GitHub Desktop.
Persistent secure task scheduler in javascript
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| const fs = require('fs') | |
| const path = require('path') | |
| const crypto = require('crypto') | |
| class PersistentSecureTaskScheduler { | |
| constructor(options = {}) { | |
| this.tasks = new Map() | |
| this.metrics = new Map() | |
| this.auditLog = options.auditLog || console.log | |
| this.circuitBreakerThreshold = options.circuitBreakerThreshold || 5 | |
| // Persistence config | |
| this.stateDir = options.stateDir || './app-state' | |
| this.stateFile = path.join(this.stateDir, 'scheduler-state.json') | |
| this.walFile = path.join(this.stateDir, 'scheduler.wal') | |
| this.checkpointInterval = options.checkpointInterval || 10 // Checkpoint every 10 executions | |
| this.executionCounter = 0 | |
| this._ensureStateDir() | |
| this._recover() | |
| } | |
| _ensureStateDir() { | |
| if (!fs.existsSync(this.stateDir)) { | |
| fs.mkdirSync(this.stateDir, { recursive: true, mode: 0o700 }) | |
| } | |
| } | |
| _recover() { | |
| try { | |
| // Apply if there are incomplete operations from the WAL | |
| this._replayWAL() | |
| // Read state file | |
| if (fs.existsSync(this.stateFile)) { | |
| const data = fs.readFileSync(this.stateFile, 'utf8') | |
| const state = JSON.parse(data) | |
| // Checksum validation | |
| const storedChecksum = state.checksum | |
| delete state.checksum | |
| const calculatedChecksum = this._calculateChecksum(state) | |
| if (storedChecksum !== calculatedChecksum) { | |
| throw new Error('State file corrupted: checksum mismatch') | |
| } | |
| // State version check | |
| if (state.version !== 1) { | |
| throw new Error(`Unsupported state version: ${state.version}`) | |
| } | |
| this._restoreState(state) | |
| this.auditLog({ | |
| event: 'state_recovered', | |
| tasksRestored: state.tasks.length, | |
| timestamp: new Date().toISOString(), | |
| }) | |
| } | |
| } catch (err) { | |
| this.auditLog({ | |
| event: 'recovery_failed', | |
| error: err.message, | |
| stack: err.stack, | |
| timestamp: new Date().toISOString(), | |
| }) | |
| // Corrupt state - backup and reset | |
| this._backupCorruptState() | |
| } | |
| } | |
| _replayWAL() { | |
| if (!fs.existsSync(this.walFile)) return | |
| try { | |
| const walData = fs.readFileSync(this.walFile, 'utf8') | |
| const entries = walData.trim().split('\n').filter(Boolean) | |
| for (const entry of entries) { | |
| const operation = JSON.parse(entry) | |
| this._applyWALOperation(operation) | |
| } | |
| // WAL has been successfully implemented, clean up | |
| fs.unlinkSync(this.walFile) | |
| } catch (err) { | |
| this.auditLog({ | |
| event: 'wal_replay_failed', | |
| error: err.message, | |
| timestamp: new Date().toISOString(), | |
| }) | |
| } | |
| } | |
| _applyWALOperation(op) { | |
| // WAL operations: metric_update, circuit_breaker_change, etc. | |
| switch (op.type) { | |
| case 'metric_update': | |
| if (this.metrics.has(op.taskId)) { | |
| Object.assign(this.metrics.get(op.taskId), op.data) | |
| } | |
| break | |
| case 'circuit_state': | |
| const task = this.tasks.get(op.taskId) | |
| if (task) { | |
| task.circuitOpen = op.open | |
| task.consecutiveFailures = op.failures | |
| } | |
| break | |
| } | |
| } | |
| _restoreState(state) { | |
| const now = process.hrtime.bigint() | |
| for (const taskState of state.tasks) { | |
| // Metrics restore | |
| this.metrics.set(taskState.id, { | |
| executions: taskState.metrics.executions, | |
| failures: taskState.metrics.failures, | |
| lastSuccess: taskState.metrics.lastSuccess, | |
| lastFailure: taskState.metrics.lastFailure, | |
| avgDuration: taskState.metrics.avgDuration, | |
| missedExecutions: taskState.metrics.missedExecutions, | |
| }) | |
| // Restore the task, but the fn callback must be set (externally) | |
| const task = { | |
| id: taskState.id, | |
| fn: null, // This will be set with schedule() | |
| intervalMs: taskState.intervalMs, | |
| jitterMs: taskState.jitterMs, | |
| timeout: taskState.timeout, | |
| critical: taskState.critical, | |
| running: false, | |
| nextRun: now, // Run immediately or set according to the interval | |
| timer: null, | |
| consecutiveFailures: taskState.consecutiveFailures, | |
| circuitOpen: taskState.circuitOpen, | |
| } | |
| // If the task time has passed, take this into account. | |
| const lastRun = taskState.lastRun ? BigInt(taskState.lastRun) : now | |
| const expectedNext = lastRun + BigInt(taskState.intervalMs * 1000000) | |
| if (expectedNext > now) { | |
| task.nextRun = expectedNext | |
| } else { | |
| // Missed execution - log and run immediately | |
| const drift = Number((now - expectedNext) / 1000000n) | |
| this.metrics.get(task.id).missedExecutions++ | |
| this.auditLog({ | |
| event: 'recovery_drift_detected', | |
| taskId: task.id, | |
| driftMs: drift, | |
| timestamp: new Date().toISOString(), | |
| }) | |
| task.nextRun = now | |
| } | |
| this.tasks.set(task.id, task) | |
| } | |
| } | |
| schedule(id, fn, intervalMs, options = {}) { | |
| let task = this.tasks.get(id) | |
| if (task) { | |
| // Set the fn callback after recovery | |
| if (task.fn === null) { | |
| task.fn = fn | |
| this._scheduleNext(task) | |
| this.auditLog({ | |
| event: 'task_reattached', | |
| taskId: id, | |
| timestamp: new Date().toISOString(), | |
| }) | |
| return | |
| } | |
| throw new Error(`Task ${id} already exists`) | |
| } | |
| // New task | |
| task = { | |
| id, | |
| fn, | |
| intervalMs, | |
| jitterMs: options.jitterMs || 0, | |
| timeout: options.timeout || intervalMs * 2, | |
| critical: options.critical || false, | |
| running: false, | |
| nextRun: process.hrtime.bigint(), | |
| timer: null, | |
| consecutiveFailures: 0, | |
| circuitOpen: false, | |
| } | |
| this.metrics.set(id, { | |
| executions: 0, | |
| failures: 0, | |
| lastSuccess: null, | |
| lastFailure: null, | |
| avgDuration: 0, | |
| missedExecutions: 0, | |
| }) | |
| this.tasks.set(id, task) | |
| this._scheduleNext(task) | |
| // Write to WAL then checkpoint | |
| this._writeWAL({ | |
| type: 'task_scheduled', | |
| taskId: id, | |
| intervalMs, | |
| options, | |
| timestamp: Date.now(), | |
| }) | |
| this._checkpoint() | |
| this.auditLog({ | |
| event: 'task_scheduled', | |
| taskId: id, | |
| intervalMs, | |
| timestamp: new Date().toISOString(), | |
| }) | |
| } | |
| _scheduleNext(task) { | |
| const now = process.hrtime.bigint() | |
| const nsToMs = (ns) => Number(ns / 1000000n) | |
| let delay = task.intervalMs + this._jitter(task.jitterMs) | |
| const idealNext = task.nextRun + BigInt(delay * 1000000) | |
| if (now > idealNext) { | |
| const drift = nsToMs(now - idealNext) | |
| if (drift > task.intervalMs) { | |
| const metrics = this.metrics.get(task.id) | |
| metrics.missedExecutions++ | |
| this.auditLog({ | |
| event: 'missed_execution', | |
| taskId: task.id, | |
| driftMs: drift, | |
| timestamp: new Date().toISOString(), | |
| }) | |
| } | |
| delay = 0 | |
| } else { | |
| delay = nsToMs(idealNext - now) | |
| } | |
| task.timer = setTimeout(() => this._execute(task), Math.max(0, delay)) | |
| task.nextRun = idealNext | |
| } | |
| async _execute(task) { | |
| if (task.circuitOpen) { | |
| this.auditLog({ | |
| event: 'circuit_open', | |
| taskId: task.id, | |
| timestamp: new Date().toISOString(), | |
| }) | |
| this._scheduleNext(task) | |
| return | |
| } | |
| if (task.running) { | |
| const metrics = this.metrics.get(task.id) | |
| metrics.missedExecutions++ | |
| this.auditLog({ | |
| event: 'execution_skipped', | |
| taskId: task.id, | |
| reason: 'previous_still_running', | |
| timestamp: new Date().toISOString(), | |
| }) | |
| this._scheduleNext(task) | |
| return | |
| } | |
| task.running = true | |
| const startTime = process.hrtime.bigint() | |
| const metrics = this.metrics.get(task.id) | |
| let timeoutHandle = setTimeout(() => { | |
| this.auditLog({ | |
| event: 'task_timeout', | |
| taskId: task.id, | |
| timeoutMs: task.timeout, | |
| timestamp: new Date().toISOString(), | |
| }) | |
| if (task.critical) { | |
| this._handleCriticalFailure(task) | |
| } | |
| }, task.timeout) | |
| try { | |
| await task.fn() | |
| const duration = Number((process.hrtime.bigint() - startTime) / 1000000n) | |
| clearTimeout(timeoutHandle) | |
| task.consecutiveFailures = 0 | |
| metrics.executions++ | |
| metrics.lastSuccess = new Date().toISOString() | |
| metrics.avgDuration = | |
| (metrics.avgDuration * (metrics.executions - 1) + duration) / metrics.executions | |
| // Write success to WAL | |
| this._writeWAL({ | |
| type: 'metric_update', | |
| taskId: task.id, | |
| data: { ...metrics }, | |
| timestamp: Date.now(), | |
| }) | |
| this.auditLog({ | |
| event: 'task_success', | |
| taskId: task.id, | |
| durationMs: duration, | |
| timestamp: new Date().toISOString(), | |
| }) | |
| } catch (err) { | |
| clearTimeout(timeoutHandle) | |
| task.consecutiveFailures++ | |
| metrics.failures++ | |
| metrics.lastFailure = new Date().toISOString() | |
| // Write failure to WAL | |
| this._writeWAL({ | |
| type: 'metric_update', | |
| taskId: task.id, | |
| data: { ...metrics }, | |
| timestamp: Date.now(), | |
| }) | |
| this.auditLog({ | |
| event: 'task_failure', | |
| taskId: task.id, | |
| error: err.message, | |
| consecutiveFailures: task.consecutiveFailures, | |
| timestamp: new Date().toISOString(), | |
| }) | |
| if (task.consecutiveFailures >= this.circuitBreakerThreshold) { | |
| task.circuitOpen = true | |
| this._writeWAL({ | |
| type: 'circuit_state', | |
| taskId: task.id, | |
| open: true, | |
| failures: task.consecutiveFailures, | |
| timestamp: Date.now(), | |
| }) | |
| this.auditLog({ | |
| event: 'circuit_breaker_open', | |
| taskId: task.id, | |
| timestamp: new Date().toISOString(), | |
| }) | |
| if (task.critical) { | |
| this._handleCriticalFailure(task) | |
| } | |
| } | |
| } finally { | |
| task.running = false | |
| task.lastRun = Number(process.hrtime.bigint()) | |
| // Checkpoint check | |
| this.executionCounter++ | |
| if (this.executionCounter >= this.checkpointInterval) { | |
| this._checkpoint() | |
| this.executionCounter = 0 | |
| } | |
| } | |
| if (this.tasks.has(task.id)) { | |
| this._scheduleNext(task) | |
| } | |
| } | |
| _writeWAL(operation) { | |
| try { | |
| fs.appendFileSync(this.walFile, JSON.stringify(operation) + '\n', { mode: 0o600 }) | |
| } catch (err) { | |
| this.auditLog({ | |
| event: 'wal_write_failed', | |
| error: err.message, | |
| timestamp: new Date().toISOString(), | |
| }) | |
| } | |
| } | |
| _checkpoint() { | |
| try { | |
| const state = { | |
| version: 1, | |
| timestamp: Date.now(), | |
| tasks: [], | |
| } | |
| for (const [id, task] of this.tasks) { | |
| const metrics = this.metrics.get(id) | |
| state.tasks.push({ | |
| id, | |
| intervalMs: task.intervalMs, | |
| jitterMs: task.jitterMs, | |
| timeout: task.timeout, | |
| critical: task.critical, | |
| consecutiveFailures: task.consecutiveFailures, | |
| circuitOpen: task.circuitOpen, | |
| lastRun: task.lastRun || 0, | |
| metrics: { ...metrics }, | |
| }) | |
| } | |
| // Add checksum | |
| state.checksum = this._calculateChecksum(state) | |
| // Atomic write: temp file + rename | |
| const tempFile = this.stateFile + '.tmp' | |
| fs.writeFileSync(tempFile, JSON.stringify(state, null, 2), { mode: 0o600 }) | |
| fs.renameSync(tempFile, this.stateFile) | |
| // WAL cleanup after checkpoint | |
| if (fs.existsSync(this.walFile)) { | |
| fs.unlinkSync(this.walFile) | |
| } | |
| this.auditLog({ | |
| event: 'checkpoint_completed', | |
| tasks: state.tasks.length, | |
| timestamp: new Date().toISOString(), | |
| }) | |
| } catch (err) { | |
| this.auditLog({ | |
| event: 'checkpoint_failed', | |
| error: err.message, | |
| stack: err.stack, | |
| timestamp: new Date().toISOString(), | |
| }) | |
| } | |
| } | |
| _calculateChecksum(obj) { | |
| const str = JSON.stringify(obj) | |
| return crypto.createHash('sha256').update(str).digest('hex') | |
| } | |
| _backupCorruptState() { | |
| try { | |
| const backupFile = path.join(this.stateDir, `corrupted-state-${Date.now()}.json`) | |
| if (fs.existsSync(this.stateFile)) { | |
| fs.copyFileSync(this.stateFile, backupFile) | |
| fs.unlinkSync(this.stateFile) | |
| } | |
| if (fs.existsSync(this.walFile)) { | |
| fs.unlinkSync(this.walFile) | |
| } | |
| this.auditLog({ | |
| event: 'corrupt_state_backed_up', | |
| backupFile, | |
| timestamp: new Date().toISOString(), | |
| }) | |
| } catch (err) { | |
| this.auditLog({ | |
| event: 'backup_failed', | |
| error: err.message, | |
| timestamp: new Date().toISOString(), | |
| }) | |
| } | |
| } | |
| _handleCriticalFailure(task) { | |
| this.auditLog({ | |
| event: 'critical_task_failure', | |
| taskId: task.id, | |
| action: 'emergency_protocol_triggered', | |
| timestamp: new Date().toISOString(), | |
| }) | |
| // Emergency checkpoint | |
| this._checkpoint() | |
| } | |
| _jitter(jitterMs) { | |
| return jitterMs > 0 ? Math.floor(Math.random() * jitterMs) : 0 | |
| } | |
| unschedule(id) { | |
| const task = this.tasks.get(id) | |
| if (task) { | |
| clearTimeout(task.timer) | |
| this.tasks.delete(id) | |
| this.metrics.delete(id) | |
| this._writeWAL({ | |
| type: 'task_unscheduled', | |
| taskId: id, | |
| timestamp: Date.now(), | |
| }) | |
| this._checkpoint() | |
| this.auditLog({ | |
| event: 'task_unscheduled', | |
| taskId: id, | |
| timestamp: new Date().toISOString(), | |
| }) | |
| } | |
| } | |
| resetCircuitBreaker(id) { | |
| const task = this.tasks.get(id) | |
| if (task) { | |
| task.circuitOpen = false | |
| task.consecutiveFailures = 0 | |
| this._writeWAL({ | |
| type: 'circuit_state', | |
| taskId: id, | |
| open: false, | |
| failures: 0, | |
| timestamp: Date.now(), | |
| }) | |
| this.auditLog({ | |
| event: 'circuit_breaker_reset', | |
| taskId: id, | |
| timestamp: new Date().toISOString(), | |
| }) | |
| } | |
| } | |
| getMetrics(id) { | |
| return this.metrics.get(id) | |
| } | |
| getAllMetrics() { | |
| return Object.fromEntries(this.metrics) | |
| } | |
| shutdown() { | |
| this.auditLog({ | |
| event: 'scheduler_shutdown', | |
| activeTasks: this.tasks.size, | |
| timestamp: new Date().toISOString(), | |
| }) | |
| // Final checkpoint | |
| this._checkpoint() | |
| for (const [id, task] of this.tasks) { | |
| clearTimeout(task.timer) | |
| } | |
| this.tasks.clear() | |
| } | |
| } | |
| // Usage | |
| const scheduler = new PersistentSecureTaskScheduler({ | |
| stateDir: '/var/lib/app/state', | |
| auditLog: (entry) => securityLogger.log(entry), | |
| circuitBreakerThreshold: 3, | |
| checkpointInterval: 10, | |
| }) | |
| // After initial startup or recovery | |
| scheduler.schedule( | |
| 'token-cleanup', | |
| async () => { | |
| await cleanupExpiredTokens() | |
| }, | |
| 30000, | |
| { | |
| jitterMs: 5000, | |
| timeout: 60000, | |
| critical: true, | |
| }, | |
| ) | |
| // Graceful shutdown | |
| process.on('SIGTERM', () => { | |
| scheduler.shutdown() | |
| process.exit(0) | |
| }) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment