Skip to content

Instantly share code, notes, and snippets.

@laphilosophia
Created January 27, 2026 08:17
Show Gist options
  • Select an option

  • Save laphilosophia/3e11aadf99c0922f7888be7fe29306f2 to your computer and use it in GitHub Desktop.

Select an option

Save laphilosophia/3e11aadf99c0922f7888be7fe29306f2 to your computer and use it in GitHub Desktop.
Persistent secure task scheduler in javascript
const fs = require('fs')
const path = require('path')
const crypto = require('crypto')
class PersistentSecureTaskScheduler {
constructor(options = {}) {
this.tasks = new Map()
this.metrics = new Map()
this.auditLog = options.auditLog || console.log
this.circuitBreakerThreshold = options.circuitBreakerThreshold || 5
// Persistence config
this.stateDir = options.stateDir || './app-state'
this.stateFile = path.join(this.stateDir, 'scheduler-state.json')
this.walFile = path.join(this.stateDir, 'scheduler.wal')
this.checkpointInterval = options.checkpointInterval || 10 // Checkpoint every 10 executions
this.executionCounter = 0
this._ensureStateDir()
this._recover()
}
_ensureStateDir() {
if (!fs.existsSync(this.stateDir)) {
fs.mkdirSync(this.stateDir, { recursive: true, mode: 0o700 })
}
}
_recover() {
try {
// Apply if there are incomplete operations from the WAL
this._replayWAL()
// Read state file
if (fs.existsSync(this.stateFile)) {
const data = fs.readFileSync(this.stateFile, 'utf8')
const state = JSON.parse(data)
// Checksum validation
const storedChecksum = state.checksum
delete state.checksum
const calculatedChecksum = this._calculateChecksum(state)
if (storedChecksum !== calculatedChecksum) {
throw new Error('State file corrupted: checksum mismatch')
}
// State version check
if (state.version !== 1) {
throw new Error(`Unsupported state version: ${state.version}`)
}
this._restoreState(state)
this.auditLog({
event: 'state_recovered',
tasksRestored: state.tasks.length,
timestamp: new Date().toISOString(),
})
}
} catch (err) {
this.auditLog({
event: 'recovery_failed',
error: err.message,
stack: err.stack,
timestamp: new Date().toISOString(),
})
// Corrupt state - backup and reset
this._backupCorruptState()
}
}
_replayWAL() {
if (!fs.existsSync(this.walFile)) return
try {
const walData = fs.readFileSync(this.walFile, 'utf8')
const entries = walData.trim().split('\n').filter(Boolean)
for (const entry of entries) {
const operation = JSON.parse(entry)
this._applyWALOperation(operation)
}
// WAL has been successfully implemented, clean up
fs.unlinkSync(this.walFile)
} catch (err) {
this.auditLog({
event: 'wal_replay_failed',
error: err.message,
timestamp: new Date().toISOString(),
})
}
}
_applyWALOperation(op) {
// WAL operations: metric_update, circuit_breaker_change, etc.
switch (op.type) {
case 'metric_update':
if (this.metrics.has(op.taskId)) {
Object.assign(this.metrics.get(op.taskId), op.data)
}
break
case 'circuit_state':
const task = this.tasks.get(op.taskId)
if (task) {
task.circuitOpen = op.open
task.consecutiveFailures = op.failures
}
break
}
}
_restoreState(state) {
const now = process.hrtime.bigint()
for (const taskState of state.tasks) {
// Metrics restore
this.metrics.set(taskState.id, {
executions: taskState.metrics.executions,
failures: taskState.metrics.failures,
lastSuccess: taskState.metrics.lastSuccess,
lastFailure: taskState.metrics.lastFailure,
avgDuration: taskState.metrics.avgDuration,
missedExecutions: taskState.metrics.missedExecutions,
})
// Restore the task, but the fn callback must be set (externally)
const task = {
id: taskState.id,
fn: null, // This will be set with schedule()
intervalMs: taskState.intervalMs,
jitterMs: taskState.jitterMs,
timeout: taskState.timeout,
critical: taskState.critical,
running: false,
nextRun: now, // Run immediately or set according to the interval
timer: null,
consecutiveFailures: taskState.consecutiveFailures,
circuitOpen: taskState.circuitOpen,
}
// If the task time has passed, take this into account.
const lastRun = taskState.lastRun ? BigInt(taskState.lastRun) : now
const expectedNext = lastRun + BigInt(taskState.intervalMs * 1000000)
if (expectedNext > now) {
task.nextRun = expectedNext
} else {
// Missed execution - log and run immediately
const drift = Number((now - expectedNext) / 1000000n)
this.metrics.get(task.id).missedExecutions++
this.auditLog({
event: 'recovery_drift_detected',
taskId: task.id,
driftMs: drift,
timestamp: new Date().toISOString(),
})
task.nextRun = now
}
this.tasks.set(task.id, task)
}
}
schedule(id, fn, intervalMs, options = {}) {
let task = this.tasks.get(id)
if (task) {
// Set the fn callback after recovery
if (task.fn === null) {
task.fn = fn
this._scheduleNext(task)
this.auditLog({
event: 'task_reattached',
taskId: id,
timestamp: new Date().toISOString(),
})
return
}
throw new Error(`Task ${id} already exists`)
}
// New task
task = {
id,
fn,
intervalMs,
jitterMs: options.jitterMs || 0,
timeout: options.timeout || intervalMs * 2,
critical: options.critical || false,
running: false,
nextRun: process.hrtime.bigint(),
timer: null,
consecutiveFailures: 0,
circuitOpen: false,
}
this.metrics.set(id, {
executions: 0,
failures: 0,
lastSuccess: null,
lastFailure: null,
avgDuration: 0,
missedExecutions: 0,
})
this.tasks.set(id, task)
this._scheduleNext(task)
// Write to WAL then checkpoint
this._writeWAL({
type: 'task_scheduled',
taskId: id,
intervalMs,
options,
timestamp: Date.now(),
})
this._checkpoint()
this.auditLog({
event: 'task_scheduled',
taskId: id,
intervalMs,
timestamp: new Date().toISOString(),
})
}
_scheduleNext(task) {
const now = process.hrtime.bigint()
const nsToMs = (ns) => Number(ns / 1000000n)
let delay = task.intervalMs + this._jitter(task.jitterMs)
const idealNext = task.nextRun + BigInt(delay * 1000000)
if (now > idealNext) {
const drift = nsToMs(now - idealNext)
if (drift > task.intervalMs) {
const metrics = this.metrics.get(task.id)
metrics.missedExecutions++
this.auditLog({
event: 'missed_execution',
taskId: task.id,
driftMs: drift,
timestamp: new Date().toISOString(),
})
}
delay = 0
} else {
delay = nsToMs(idealNext - now)
}
task.timer = setTimeout(() => this._execute(task), Math.max(0, delay))
task.nextRun = idealNext
}
async _execute(task) {
if (task.circuitOpen) {
this.auditLog({
event: 'circuit_open',
taskId: task.id,
timestamp: new Date().toISOString(),
})
this._scheduleNext(task)
return
}
if (task.running) {
const metrics = this.metrics.get(task.id)
metrics.missedExecutions++
this.auditLog({
event: 'execution_skipped',
taskId: task.id,
reason: 'previous_still_running',
timestamp: new Date().toISOString(),
})
this._scheduleNext(task)
return
}
task.running = true
const startTime = process.hrtime.bigint()
const metrics = this.metrics.get(task.id)
let timeoutHandle = setTimeout(() => {
this.auditLog({
event: 'task_timeout',
taskId: task.id,
timeoutMs: task.timeout,
timestamp: new Date().toISOString(),
})
if (task.critical) {
this._handleCriticalFailure(task)
}
}, task.timeout)
try {
await task.fn()
const duration = Number((process.hrtime.bigint() - startTime) / 1000000n)
clearTimeout(timeoutHandle)
task.consecutiveFailures = 0
metrics.executions++
metrics.lastSuccess = new Date().toISOString()
metrics.avgDuration =
(metrics.avgDuration * (metrics.executions - 1) + duration) / metrics.executions
// Write success to WAL
this._writeWAL({
type: 'metric_update',
taskId: task.id,
data: { ...metrics },
timestamp: Date.now(),
})
this.auditLog({
event: 'task_success',
taskId: task.id,
durationMs: duration,
timestamp: new Date().toISOString(),
})
} catch (err) {
clearTimeout(timeoutHandle)
task.consecutiveFailures++
metrics.failures++
metrics.lastFailure = new Date().toISOString()
// Write failure to WAL
this._writeWAL({
type: 'metric_update',
taskId: task.id,
data: { ...metrics },
timestamp: Date.now(),
})
this.auditLog({
event: 'task_failure',
taskId: task.id,
error: err.message,
consecutiveFailures: task.consecutiveFailures,
timestamp: new Date().toISOString(),
})
if (task.consecutiveFailures >= this.circuitBreakerThreshold) {
task.circuitOpen = true
this._writeWAL({
type: 'circuit_state',
taskId: task.id,
open: true,
failures: task.consecutiveFailures,
timestamp: Date.now(),
})
this.auditLog({
event: 'circuit_breaker_open',
taskId: task.id,
timestamp: new Date().toISOString(),
})
if (task.critical) {
this._handleCriticalFailure(task)
}
}
} finally {
task.running = false
task.lastRun = Number(process.hrtime.bigint())
// Checkpoint check
this.executionCounter++
if (this.executionCounter >= this.checkpointInterval) {
this._checkpoint()
this.executionCounter = 0
}
}
if (this.tasks.has(task.id)) {
this._scheduleNext(task)
}
}
_writeWAL(operation) {
try {
fs.appendFileSync(this.walFile, JSON.stringify(operation) + '\n', { mode: 0o600 })
} catch (err) {
this.auditLog({
event: 'wal_write_failed',
error: err.message,
timestamp: new Date().toISOString(),
})
}
}
_checkpoint() {
try {
const state = {
version: 1,
timestamp: Date.now(),
tasks: [],
}
for (const [id, task] of this.tasks) {
const metrics = this.metrics.get(id)
state.tasks.push({
id,
intervalMs: task.intervalMs,
jitterMs: task.jitterMs,
timeout: task.timeout,
critical: task.critical,
consecutiveFailures: task.consecutiveFailures,
circuitOpen: task.circuitOpen,
lastRun: task.lastRun || 0,
metrics: { ...metrics },
})
}
// Add checksum
state.checksum = this._calculateChecksum(state)
// Atomic write: temp file + rename
const tempFile = this.stateFile + '.tmp'
fs.writeFileSync(tempFile, JSON.stringify(state, null, 2), { mode: 0o600 })
fs.renameSync(tempFile, this.stateFile)
// WAL cleanup after checkpoint
if (fs.existsSync(this.walFile)) {
fs.unlinkSync(this.walFile)
}
this.auditLog({
event: 'checkpoint_completed',
tasks: state.tasks.length,
timestamp: new Date().toISOString(),
})
} catch (err) {
this.auditLog({
event: 'checkpoint_failed',
error: err.message,
stack: err.stack,
timestamp: new Date().toISOString(),
})
}
}
_calculateChecksum(obj) {
const str = JSON.stringify(obj)
return crypto.createHash('sha256').update(str).digest('hex')
}
_backupCorruptState() {
try {
const backupFile = path.join(this.stateDir, `corrupted-state-${Date.now()}.json`)
if (fs.existsSync(this.stateFile)) {
fs.copyFileSync(this.stateFile, backupFile)
fs.unlinkSync(this.stateFile)
}
if (fs.existsSync(this.walFile)) {
fs.unlinkSync(this.walFile)
}
this.auditLog({
event: 'corrupt_state_backed_up',
backupFile,
timestamp: new Date().toISOString(),
})
} catch (err) {
this.auditLog({
event: 'backup_failed',
error: err.message,
timestamp: new Date().toISOString(),
})
}
}
_handleCriticalFailure(task) {
this.auditLog({
event: 'critical_task_failure',
taskId: task.id,
action: 'emergency_protocol_triggered',
timestamp: new Date().toISOString(),
})
// Emergency checkpoint
this._checkpoint()
}
_jitter(jitterMs) {
return jitterMs > 0 ? Math.floor(Math.random() * jitterMs) : 0
}
unschedule(id) {
const task = this.tasks.get(id)
if (task) {
clearTimeout(task.timer)
this.tasks.delete(id)
this.metrics.delete(id)
this._writeWAL({
type: 'task_unscheduled',
taskId: id,
timestamp: Date.now(),
})
this._checkpoint()
this.auditLog({
event: 'task_unscheduled',
taskId: id,
timestamp: new Date().toISOString(),
})
}
}
resetCircuitBreaker(id) {
const task = this.tasks.get(id)
if (task) {
task.circuitOpen = false
task.consecutiveFailures = 0
this._writeWAL({
type: 'circuit_state',
taskId: id,
open: false,
failures: 0,
timestamp: Date.now(),
})
this.auditLog({
event: 'circuit_breaker_reset',
taskId: id,
timestamp: new Date().toISOString(),
})
}
}
getMetrics(id) {
return this.metrics.get(id)
}
getAllMetrics() {
return Object.fromEntries(this.metrics)
}
shutdown() {
this.auditLog({
event: 'scheduler_shutdown',
activeTasks: this.tasks.size,
timestamp: new Date().toISOString(),
})
// Final checkpoint
this._checkpoint()
for (const [id, task] of this.tasks) {
clearTimeout(task.timer)
}
this.tasks.clear()
}
}
// Usage
const scheduler = new PersistentSecureTaskScheduler({
stateDir: '/var/lib/app/state',
auditLog: (entry) => securityLogger.log(entry),
circuitBreakerThreshold: 3,
checkpointInterval: 10,
})
// After initial startup or recovery
scheduler.schedule(
'token-cleanup',
async () => {
await cleanupExpiredTokens()
},
30000,
{
jitterMs: 5000,
timeout: 60000,
critical: true,
},
)
// Graceful shutdown
process.on('SIGTERM', () => {
scheduler.shutdown()
process.exit(0)
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment