Created
October 10, 2025 19:52
-
-
Save Sdy603/3efbcbeecad1cac79dbca9872d1da59f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // Jira Worklogs + Summary with Original Estimate | |
| // | |
| /* | |
| Jira Worklogs + Estimates Sync | |
| Purpose: | |
| - Sync Jira worklogs into Postgres and build a per issue time summary with Original Estimate. | |
| Selection: | |
| - Processes only completed issues | |
| - Applies a grace window (COMPLETED_GRACE_DAYS) | |
| - Skips issues already snapshotted unless OVERWRITE_SUMMARY=true | |
| - Optional scoping when overwriting via REBUILD_SINCE_DAYS or ISSUE_KEYS_CSV | |
| Per issue flow: | |
| - Fetch all worklogs with pagination and sum time_spent | |
| - Fetch Original Estimate from Jira (timeoriginalestimate, fallback to aggregate) | |
| - Insert worklog rows into custom.jira_issue_time_entries | |
| - Upsert per issue totals into custom.jira_issue_time (time_spent, original_estimate_seconds) | |
| Reliability and performance: | |
| - Concurrency limited by MAX_CONCURRENCY and paced by RATE_LIMIT_DELAY_MS | |
| - Automatic retries with exponential backoff and Retry-After support | |
| - DRY_RUN=true logs actions without writing | |
| Run: | |
| - Dry run: DRY_RUN=true node jira_sync.js | |
| - Prod write: node jira_sync.js | |
| */ | |
| // | |
| // Env vars: | |
| // PG_CONNECTION Postgres connection string -- Obtain here https://app.getdx.com/admin/datacloud/dbusers | |
| // JIRA_BASE_URL ex: https://your-domain.atlassian.net | |
| // JIRA_USER Jira service account | |
| // JIRA_API_TOKEN Jira API token | |
| // DRY_RUN true|false (default false) | |
| // MAX_CONCURRENCY default 4 | |
| // RATE_LIMIT_DELAY_MS default 300 | |
| // COMPLETED_GRACE_DAYS default 3 (only snapshot issues completed at least N days ago) | |
| // OVERWRITE_SUMMARY true|false (default false) | |
| // REBUILD_SINCE_DAYS optional limit when overwriting, ex: 180 | |
| // ISSUE_KEYS_CSV optional explicit jira keys to process, ex: "ABC-1,ABC-2" | |
| const { Client } = require("pg"); | |
| const axios = require("axios"); | |
| const JIRA_BASE_URL = process.env.JIRA_BASE_URL || "https://<account>.atlassian.net"; | |
| const JIRA_USER = process.env.JIRA_USER; | |
| const JIRA_API_TOKEN = process.env.JIRA_API_TOKEN; | |
| const PG_CONNECTION = process.env.PG_CONNECTION; | |
| const DRY_RUN = String(process.env.DRY_RUN || "").toLowerCase() === "true"; | |
| const MAX_CONCURRENCY = Number(process.env.MAX_CONCURRENCY || "4"); | |
| const RATE_LIMIT_DELAY_MS = Number(process.env.RATE_LIMIT_DELAY_MS || "300"); | |
| const COMPLETED_GRACE_DAYS = Number(process.env.COMPLETED_GRACE_DAYS || "3"); | |
| const OVERWRITE_SUMMARY = String(process.env.OVERWRITE_SUMMARY || "").toLowerCase() === "true"; | |
| const REBUILD_SINCE_DAYS = process.env.REBUILD_SINCE_DAYS ? Number(process.env.REBUILD_SINCE_DAYS) : null; | |
| const ISSUE_KEYS_CSV = process.env.ISSUE_KEYS_CSV || ""; | |
| const ISSUE_KEYS = ISSUE_KEYS_CSV.split(",").map(s => s.trim()).filter(Boolean); | |
| if (!PG_CONNECTION) throw new Error("PG_CONNECTION is required"); | |
| if (!JIRA_USER || !JIRA_API_TOKEN) throw new Error("JIRA_USER and JIRA_API_TOKEN are required"); | |
| function normalizePostgresURL(url) { | |
| return url.startsWith("postgres://") ? url.replace("postgres://", "postgresql://") : url; | |
| } | |
| const pgClient = new Client({ | |
| connectionString: normalizePostgresURL(PG_CONNECTION), | |
| ssl: { rejectUnauthorized: false }, | |
| }); | |
| const http = axios.create({ | |
| baseURL: JIRA_BASE_URL, | |
| auth: { username: JIRA_USER, password: JIRA_API_TOKEN }, | |
| headers: { Accept: "application/json" }, | |
| timeout: 30000, | |
| }); | |
| function delay(ms) { | |
| return new Promise(resolve => setTimeout(resolve, ms)); | |
| } | |
| // Backoff that respects Retry-After | |
| async function withBackoff(fn, label, maxAttempts = 5) { | |
| let attempt = 0; | |
| while (true) { | |
| try { | |
| return await fn(); | |
| } catch (err) { | |
| attempt += 1; | |
| const status = err.response?.status; | |
| const retryAfterHeader = err.response?.headers?.["retry-after"]; | |
| const serverHintMs = retryAfterHeader ? Number(retryAfterHeader) * 1000 : null; | |
| if (attempt >= maxAttempts) { | |
| console.error(`${label} failed after ${attempt} attempts`, status || err.message); | |
| throw err; | |
| } | |
| const backoffMs = | |
| serverHintMs != null | |
| ? serverHintMs | |
| : Math.min(15000, 500 * Math.pow(2, attempt - 1)) + Math.floor(Math.random() * 250); | |
| console.warn(`${label} attempt ${attempt} failed with ${status || err.message}. Backing off ${backoffMs} ms`); | |
| await delay(backoffMs); | |
| } | |
| } | |
| } | |
| // Concurrency helper | |
| async function mapLimit(items, limit, iterator) { | |
| let i = 0; | |
| const workers = Array.from({ length: limit }, () => | |
| (async () => { | |
| while (true) { | |
| const idx = i++; | |
| if (idx >= items.length) break; | |
| try { | |
| await iterator(items[idx], idx); | |
| } catch (e) { | |
| console.error("Task failed", e.message); | |
| } | |
| } | |
| })(), | |
| ); | |
| await Promise.all(workers); | |
| } | |
| // Fetch all worklogs with pagination | |
| async function fetchAllWorklogs(issueKey) { | |
| const pageSize = 100; | |
| let startAt = 0; | |
| let all = []; | |
| while (true) { | |
| const url = `/rest/api/3/issue/${encodeURIComponent(issueKey)}/worklog?startAt=${startAt}&maxResults=${pageSize}`; | |
| const data = await withBackoff(async () => (await http.get(url)).data, `worklog fetch ${issueKey}@${startAt}`); | |
| const chunk = data.worklogs || []; | |
| all = all.concat(chunk); | |
| const total = typeof data.total === "number" ? data.total : startAt + chunk.length; | |
| startAt += chunk.length; | |
| if (startAt >= total || chunk.length === 0) break; | |
| await delay(150); | |
| } | |
| return all; | |
| } | |
| // Fetch Original Estimate for an issue | |
| async function fetchIssueOriginalEstimate(issueKey) { | |
| const fields = "timeoriginalestimate,aggregatetimeoriginalestimate"; | |
| try { | |
| const url = `/rest/api/3/issue/${encodeURIComponent(issueKey)}?fields=${fields}`; | |
| const data = await withBackoff(async () => (await http.get(url)).data, `issue fetch ${issueKey}`); | |
| const f = data?.fields || {}; | |
| const original = f.timeoriginalestimate ?? f.aggregatetimeoriginalestimate ?? null; | |
| return Number.isFinite(original) ? original : null; | |
| } catch (err) { | |
| console.error(`Original estimate fetch error for ${issueKey}:`, err.response?.status || err.message); | |
| return null; | |
| } | |
| } | |
| async function upsertWorklogsAndSummary(issueId, issueKey, worklogs, originalEstimateSeconds) { | |
| let totalTime = 0; | |
| if (!DRY_RUN) await pgClient.query("BEGIN"); | |
| try { | |
| for (const log of worklogs) { | |
| totalTime += log.timeSpentSeconds || 0; | |
| await pgClient.query( | |
| `INSERT INTO custom.jira_issue_time_entries | |
| (source_id, issue_id, user_id, created_at, time_spent) | |
| VALUES ($1, $2, $3, $4, $5) | |
| ON CONFLICT (source_id) | |
| DO UPDATE SET | |
| user_id = EXCLUDED.user_id, | |
| created_at = EXCLUDED.created_at, | |
| time_spent = EXCLUDED.time_spent`, | |
| [ | |
| parseInt(log.id, 10), | |
| parseInt(issueId, 10), | |
| log.author?.accountId || null, | |
| log.created, | |
| log.timeSpentSeconds, | |
| ], | |
| ); | |
| } | |
| // Summary upsert with change guard to avoid unnecessary writes | |
| await pgClient.query( | |
| `INSERT INTO custom.jira_issue_time | |
| (issue_id, source_id, time_spent, original_estimate_seconds) | |
| VALUES ($1, $2, $3, $4) | |
| ON CONFLICT (issue_id) | |
| DO UPDATE SET | |
| time_spent = EXCLUDED.time_spent, | |
| original_estimate_seconds = EXCLUDED.original_estimate_seconds | |
| WHERE custom.jira_issue_time.time_spent IS DISTINCT FROM EXCLUDED.time_spent | |
| OR custom.jira_issue_time.original_estimate_seconds IS DISTINCT FROM EXCLUDED.original_estimate_seconds`, | |
| [parseInt(issueId, 10), issueKey, totalTime, originalEstimateSeconds], | |
| ); | |
| if (DRY_RUN) { | |
| console.log(`[DRY RUN] ${issueKey} time_spent=${totalTime}s original=${originalEstimateSeconds ?? "null"}`); | |
| } | |
| } catch (err) { | |
| console.error(`DB upsert error for ${issueKey}:`, err.message); | |
| if (!DRY_RUN) { | |
| try { await pgClient.query("ROLLBACK"); } catch {} | |
| throw err; | |
| } | |
| } | |
| if (!DRY_RUN) await pgClient.query("COMMIT"); | |
| } | |
| async function main() { | |
| await pgClient.connect(); | |
| // Build selection of completed issues with grace window | |
| const params = []; | |
| let sql = ` | |
| SELECT ji.id, ji.key | |
| FROM jira_issues ji | |
| LEFT JOIN custom.jira_issue_time jit ON jit.issue_id = ji.id | |
| WHERE ji.completed_at IS NOT NULL | |
| AND ji.completed_at <= NOW() - INTERVAL '${COMPLETED_GRACE_DAYS} days' | |
| `; | |
| if (!OVERWRITE_SUMMARY) { | |
| sql += ` AND jit.issue_id IS NULL `; | |
| } | |
| if (REBUILD_SINCE_DAYS && OVERWRITE_SUMMARY) { | |
| sql += ` AND ji.completed_at >= NOW() - INTERVAL '${REBUILD_SINCE_DAYS} days' `; | |
| } | |
| if (ISSUE_KEYS.length > 0) { | |
| params.push(ISSUE_KEYS); | |
| sql += ` AND ji.key = ANY($${params.length}) `; | |
| } | |
| sql += ` ORDER BY ji.key `; | |
| const res = await pgClient.query(sql, params); | |
| const rows = res.rows; | |
| console.log( | |
| `Loaded ${rows.length} completed issues after ${COMPLETED_GRACE_DAYS} day grace. OVERWRITE_SUMMARY=${OVERWRITE_SUMMARY} MAX_CONCURRENCY=${MAX_CONCURRENCY}`, | |
| ); | |
| let processed = 0; | |
| const startedAt = Date.now(); | |
| await mapLimit(rows, MAX_CONCURRENCY, async ({ id: issueId, key: issueKey }) => { | |
| try { | |
| const [worklogs, originalEstimateSeconds] = await Promise.all([ | |
| fetchAllWorklogs(issueKey), | |
| fetchIssueOriginalEstimate(issueKey), | |
| ]); | |
| await upsertWorklogsAndSummary(issueId, issueKey, worklogs, originalEstimateSeconds); | |
| } catch (e) { | |
| console.error(`Failed ${issueKey}:`, e.message); | |
| } finally { | |
| processed += 1; | |
| if (processed % 100 === 0) { | |
| const elapsed = Math.round((Date.now() - startedAt) / 1000); | |
| console.log(`Progress ${processed}/${rows.length} in ${elapsed}s`); | |
| } | |
| await delay(RATE_LIMIT_DELAY_MS); | |
| } | |
| }); | |
| await pgClient.end(); | |
| console.log(`Done. Processed ${processed} issues`); | |
| } | |
| main().catch(async (e) => { | |
| console.error(e); | |
| try { await pgClient.end(); } catch {} | |
| process.exit(1); | |
| }); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment