Last active
September 26, 2025 19:38
-
-
Save Sdy603/6f50a63bb3021de4291392cc49339289 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /** | |
| * fetch_work_item_parents.js | |
| * | |
| * Backfill: WorkItemLinks WIQL, partitioned by Source.ChangedDate windows using DATE precision (YYYY-MM-DD) | |
| * Incremental: WorkItems WIQL by ChangedDate with DATE precision filter (YYYY-MM-DD) | |
| * Writes: batched multi row upserts into custom.ado_work_item_links | |
| * Schema columns used: child_work_item_source_id, parent_work_item_source_id, relation_url | |
| * MIN_ID applied to both parent and child | |
| * Direct ADO HTTPS without DX proxy | |
| * Dry run writes a SQL file | |
| */ | |
| const { Client } = require("pg"); | |
| const fs = require("fs"); | |
| const path = require("path"); | |
| const axios = require("axios"); | |
| // Required env | |
| const { DX_PG_URL, ADO_PAT } = process.env; | |
| if (!DX_PG_URL || !ADO_PAT) { | |
| console.error("Set DX_PG_URL and ADO_PAT"); | |
| process.exit(1); | |
| } | |
| // Tunables | |
| const MODE = (process.env.MODE || "incremental").toLowerCase(); | |
| const DRY_RUN = process.env.DRY_RUN === "1"; | |
| const OUT_FILE = process.env.OUT_FILE || "ado_links_inserts.sql"; | |
| const BATCH_SIZE = parseInt(process.env.BATCH_SIZE || "200", 10); // fetch chunk size for workitemsbatch | |
| const FIRST_RUN_LOOKBACK_DAYS = parseInt(process.env.FIRST_RUN_LOOKBACK_DAYS || "3650", 10); | |
| const MIN_ID = parseInt(process.env.MIN_ID || "11339", 10); | |
| // Write batching | |
| const WRITE_BUFFER_SIZE = parseInt(process.env.WRITE_BUFFER_SIZE || "200", 10); // rows per upsert statement | |
| const WRITE_FLUSH_MS = parseInt(process.env.WRITE_FLUSH_MS || "250", 10); // timer based flush | |
| // Pacing | |
| const SLEEP_PER_2000_LINKS_MS = parseInt(process.env.SLEEP_PER_2000_LINKS_MS || "400", 10); | |
| const SLEEP_BETWEEN_BATCHES_MS = parseInt(process.env.SLEEP_BETWEEN_BATCHES_MS || "300", 10); | |
| const SLEEP_BETWEEN_PROJECTS_MS = parseInt(process.env.SLEEP_BETWEEN_PROJECTS_MS || "500", 10); | |
| // Normalize DX connection string | |
| const normalizePostgresURL = (url) => | |
| url.startsWith("postgres://") ? url.replace("postgres://", "postgresql://") : url; | |
| // Axios wrapper with retry and WIQL cap handling | |
| let httpCalls = 0; | |
| async function httpJson(url, { method = "GET", headers = {}, body = undefined }, attempt = 1) { | |
| httpCalls += 1; | |
| try { | |
| const res = await axios({ | |
| url, | |
| method, | |
| headers, | |
| data: body, | |
| timeout: 30000, | |
| validateStatus: () => true | |
| }); | |
| if (res.status >= 200 && res.status < 300) return res.data; | |
| if (res.status === 400 && /WorkItemTrackingQueryResultSizeLimitExceeded/i.test(JSON.stringify(res.data || {}))) { | |
| const e = new Error("WIQL_LINKS_RESULT_CAP"); | |
| e.code = "WIQL_LINKS_RESULT_CAP"; | |
| throw e; | |
| } | |
| if ((res.status === 429 || (res.status >= 500 && res.status < 600)) && attempt < 6) { | |
| const ra = parseInt(res.headers["retry-after"] || "0", 10); | |
| const ms = ra > 0 ? ra * 1000 : attempt * 1000; | |
| await new Promise(r => setTimeout(r, ms)); | |
| return httpJson(url, { method, headers, body }, attempt + 1); | |
| } | |
| throw new Error(`HTTP ${res.status}: ${res.statusText} | ${typeof res.data === "string" ? res.data : JSON.stringify(res.data)}`); | |
| } catch (err) { | |
| if (err && err.code === "WIQL_LINKS_RESULT_CAP") throw err; | |
| if (attempt < 6) { | |
| const ms = attempt * 1000; | |
| await new Promise(r => setTimeout(r, ms)); | |
| return httpJson(url, { method, headers, body }, attempt + 1); | |
| } | |
| throw err; | |
| } | |
| } | |
| const AUTH_HEADER = { Authorization: "Basic " + Buffer.from(":" + ADO_PAT).toString("base64") }; | |
| // Projects query | |
| const ONE_READ_SQL = ` | |
| SELECT | |
| ao.name AS organization_name, | |
| ap.name AS project_name, | |
| MAX(l.row_created_at) AS last_ingest | |
| FROM ado_projects ap | |
| JOIN ado_organizations ao | |
| ON ap.organization_id::bigint = ao.id | |
| LEFT JOIN custom.ado_work_item_links l | |
| ON true | |
| WHERE ap.api_accessible = TRUE | |
| AND ap.allowlisted = TRUE | |
| AND ao.api_accessible = TRUE | |
| GROUP BY ao.name, ap.name | |
| ORDER BY ao.name, ap.name; | |
| `; | |
| // Helpers | |
| function escapeSql(s) { return s.replace(/'/g, "''"); } | |
| function parentIdFromUrl(url) { const n = parseInt(url.split("/").pop(), 10); return Number.isFinite(n) ? n : null; } | |
| function chunk(arr, n) { const out = []; for (let i = 0; i < arr.length; i += n) out.push(arr.slice(i, i + n)); return out; } | |
| async function sleep(ms) { return new Promise(r => setTimeout(r, ms)); } | |
| function toWiqlDate(d) { | |
| const y = d.getUTCFullYear(); | |
| const m = String(d.getUTCMonth() + 1).padStart(2, "0"); | |
| const day = String(d.getUTCDate()).padStart(2, "0"); | |
| return `${y}-${m}-${day}`; | |
| } | |
| // Build one row insert for dry run output file | |
| function insertSql(childId, parentId, relUrl) { | |
| const urlSql = relUrl ? `'${escapeSql(relUrl)}'` : "NULL"; | |
| return ( | |
| "INSERT INTO custom.ado_work_item_links " + | |
| "(child_work_item_source_id, parent_work_item_source_id, relation_url) VALUES " + | |
| `(${childId}, ${parentId}, ${urlSql}) ` + | |
| "ON CONFLICT (child_work_item_source_id, parent_work_item_source_id) " + | |
| "DO UPDATE SET relation_url = EXCLUDED.relation_url;" | |
| ); | |
| } | |
| // Buffered writer for live mode with multi row upserts | |
| function buildWriter(DRY_RUN, client, sqlLines) { | |
| if (DRY_RUN) { | |
| const writer = async (childId, parentId, relUrl) => { | |
| sqlLines.push(insertSql(childId, parentId, relUrl)); | |
| }; | |
| writer.flush = async () => {}; | |
| return writer; | |
| } | |
| const buf = []; | |
| let flushTimer = null; | |
| async function flush() { | |
| if (buf.length === 0) return; | |
| const rows = buf.splice(0, buf.length); | |
| const values = []; | |
| const params = []; | |
| let p = 1; | |
| for (const r of rows) { | |
| values.push(`($${p++}, $${p++}, $${p++})`); | |
| params.push(r.childId, r.parentId, r.relUrl); | |
| } | |
| const sql = ` | |
| INSERT INTO custom.ado_work_item_links | |
| (child_work_item_source_id, parent_work_item_source_id, relation_url) | |
| VALUES ${values.join(",")} | |
| ON CONFLICT (child_work_item_source_id, parent_work_item_source_id) | |
| DO UPDATE SET relation_url = EXCLUDED.relation_url | |
| `; | |
| await client.query(sql, params); | |
| await sleep(100); | |
| } | |
| async function scheduleFlush() { | |
| if (flushTimer) return; | |
| flushTimer = setTimeout(async () => { | |
| flushTimer = null; | |
| try { await flush(); } catch (e) { console.error(e.message || String(e)); } | |
| }, WRITE_FLUSH_MS); | |
| } | |
| const writer = async (childId, parentId, relUrl) => { | |
| buf.push({ childId, parentId, relUrl }); | |
| if (buf.length >= WRITE_BUFFER_SIZE) { | |
| await flush(); | |
| } else { | |
| await scheduleFlush(); | |
| } | |
| }; | |
| writer.flush = flush; | |
| return writer; | |
| } | |
| /** | |
| * Backfill using WorkItemLinks with date only windows. | |
| * Returns { total, kept, dropped } for metrics. | |
| */ | |
| async function backfillProject(org, project, writeFn) { | |
| const base = `https://dev.azure.com/${encodeURIComponent(org)}`; | |
| async function runLinksWiql(start, end) { | |
| const wiql = ` | |
| SELECT [System.Id] | |
| FROM WorkItemLinks | |
| WHERE | |
| [Source].[System.TeamProject] = '${escapeSql(project)}' | |
| AND [Source].[System.ChangedDate] >= '${toWiqlDate(start)}' | |
| AND [Source].[System.ChangedDate] < '${toWiqlDate(end)}' | |
| AND [System.Links.LinkType] = 'System.LinkTypes.Hierarchy-Forward' | |
| MODE(Recursive) | |
| `; | |
| const url = `${base}/${encodeURIComponent(project)}/_apis/wit/wiql?api-version=7.1-preview.2`; | |
| const data = await httpJson(url, { method: "POST", headers: { ...AUTH_HEADER, "Content-Type": "application/json" }, body: JSON.stringify({ query: wiql }) }); | |
| return data.workItemRelations || []; | |
| } | |
| let total = 0, kept = 0, dropped = 0; | |
| const earliest = new Date("2005-01-01T00:00:00Z"); | |
| const now = new Date(); | |
| for (let d = new Date(earliest); d < now; ) { | |
| const end = new Date(d); | |
| end.setUTCDate(end.getUTCDate() + 180); | |
| if (end > now) end.setTime(now.getTime()); | |
| let rels; | |
| try { | |
| rels = await runLinksWiql(d, end); | |
| } catch (e) { | |
| if (e && e.code === "WIQL_LINKS_RESULT_CAP") { | |
| const mid = new Date((d.getTime() + end.getTime()) / 2); | |
| const a = await backfillProjectWindow(org, project, writeFn, d, mid); | |
| const b = await backfillProjectWindow(org, project, writeFn, mid, end); | |
| total += a.total + b.total; | |
| kept += a.kept + b.kept; | |
| dropped += a.dropped + b.dropped; | |
| d = end; | |
| continue; | |
| } | |
| throw e; | |
| } | |
| for (const r of rels) { | |
| if (!r.source || !r.target) continue; | |
| total++; | |
| const parentId = r.source.id; | |
| const childId = r.target.id; | |
| if (!Number.isFinite(parentId) || !Number.isFinite(childId)) { dropped++; continue; } | |
| if (childId < MIN_ID || parentId < MIN_ID) { dropped++; continue; } | |
| const relUrl = `${base}/_apis/wit/workItems/${parentId}`; | |
| await writeFn(childId, parentId, relUrl); | |
| kept++; | |
| if (kept % 2000 === 0) await sleep(SLEEP_PER_2000_LINKS_MS); | |
| } | |
| d = end; | |
| } | |
| return { total, kept, dropped }; | |
| // helper for split windows | |
| async function backfillProjectWindow(org, project, writeFn, start, end) { | |
| const base2 = `https://dev.azure.com/${encodeURIComponent(org)}`; | |
| const url = `${base2}/${encodeURIComponent(project)}/_apis/wit/wiql?api-version=7.1-preview.2`; | |
| const wiql = ` | |
| SELECT [System.Id] | |
| FROM WorkItemLinks | |
| WHERE | |
| [Source].[System.TeamProject] = '${escapeSql(project)}' | |
| AND [Source].[System.ChangedDate] >= '${toWiqlDate(start)}' | |
| AND [Source].[System.ChangedDate] < '${toWiqlDate(end)}' | |
| AND [System.Links.LinkType] = 'System.LinkTypes.Hierarchy-Forward' | |
| MODE(Recursive) | |
| `; | |
| try { | |
| const data = await httpJson(url, { method: "POST", headers: { ...AUTH_HEADER, "Content-Type": "application/json" }, body: JSON.stringify({ query: wiql }) }); | |
| const rels = data.workItemRelations || []; | |
| let t = 0, k = 0, dr = 0; | |
| for (const r of rels) { | |
| if (!r.source || !r.target) continue; | |
| t++; | |
| const parentId = r.source.id; | |
| const childId = r.target.id; | |
| if (!Number.isFinite(parentId) || !Number.isFinite(childId)) { dr++; continue; } | |
| if (childId < MIN_ID || parentId < MIN_ID) { dr++; continue; } | |
| const relUrl = `${base2}/_apis/wit/workItems/${parentId}`; | |
| await writeFn(childId, parentId, relUrl); | |
| k++; | |
| if (k % 2000 === 0) await sleep(SLEEP_PER_2000_LINKS_MS); | |
| } | |
| return { total: t, kept: k, dropped: dr }; | |
| } catch (e) { | |
| if (e && e.code === "WIQL_LINKS_RESULT_CAP") { | |
| const mid = new Date((start.getTime() + end.getTime()) / 2); | |
| const left = await backfillProjectWindow(org, project, writeFn, start, mid); | |
| const right = await backfillProjectWindow(org, project, writeFn, mid, end); | |
| return { total: left.total + right.total, kept: left.kept + right.kept, dropped: left.dropped + right.dropped }; | |
| } | |
| throw e; | |
| } | |
| } | |
| } | |
| // Incremental using ChangedDate WIQL and workitemsbatch expand=Relations | |
| async function incrementalProject(org, project, sinceISO, writeFn) { | |
| const base = `https://dev.azure.com/${encodeURIComponent(org)}`; | |
| // WIQL date precision requires YYYY-MM-DD without time | |
| const sinceDate = new Date(sinceISO); | |
| const sinceWiqlDate = toWiqlDate(sinceDate); | |
| const wiql = ` | |
| SELECT [System.Id] | |
| FROM WorkItems | |
| WHERE [System.TeamProject] = '${escapeSql(project)}' | |
| AND [System.ChangedDate] >= '${sinceWiqlDate}' | |
| ORDER BY [System.ChangedDate] ASC | |
| `; | |
| const idsRes = await httpJson( | |
| `${base}/${encodeURIComponent(project)}/_apis/wit/wiql?api-version=7.1-preview.2`, | |
| { method: "POST", headers: { ...AUTH_HEADER, "Content-Type": "application/json" }, body: JSON.stringify({ query: wiql }) } | |
| ); | |
| const ids = (idsRes.workItems || []).map(w => w.id).filter(Number.isFinite); | |
| if (ids.length === 0) return { scanned: 0, links: 0 }; | |
| let links = 0; | |
| for (const slice of chunk(ids, BATCH_SIZE)) { | |
| const batch = await httpJson( | |
| `${base}/_apis/wit/workitemsbatch?api-version=7.1`, | |
| { method: "POST", headers: { ...AUTH_HEADER, "Content-Type": "application/json" }, body: JSON.stringify({ ids: slice, fields: ["System.Id"], expand: "Relations" }) } | |
| ); | |
| for (const wi of (batch.value || [])) { | |
| const childId = wi.id; | |
| for (const rel of (wi.relations || [])) { | |
| if (rel.rel !== "System.LinkTypes.Hierarchy-Reverse" || !rel.url) continue; | |
| const parentId = parentIdFromUrl(rel.url); | |
| if (!parentId) continue; | |
| if (childId < MIN_ID || parentId < MIN_ID) continue; | |
| const relUrl = `${base}/_apis/wit/workItems/${parentId}`; | |
| await writeFn(childId, parentId, relUrl); | |
| links += 1; | |
| } | |
| } | |
| await sleep(SLEEP_BETWEEN_BATCHES_MS); | |
| } | |
| return { scanned: ids.length, links }; | |
| } | |
| async function main() { | |
| const started = new Date(); | |
| const client = new Client({ connectionString: normalizePostgresURL(DX_PG_URL), ssl: { rejectUnauthorized: false } }); | |
| await client.connect(); | |
| const { rows: scopes } = await client.query(ONE_READ_SQL); | |
| if (!scopes.length) { | |
| console.log("No API accessible and allowlisted projects found"); | |
| await client.end(); | |
| return; | |
| } | |
| const sqlLines = []; | |
| const writer = buildWriter(DRY_RUN, client, sqlLines); | |
| let totalLinksProcessed = 0; | |
| try { | |
| for (const { organization_name: org, project_name: project, last_ingest } of scopes) { | |
| if (!DRY_RUN) { | |
| await client.query("BEGIN"); | |
| await client.query("SET LOCAL statement_timeout = '15s'"); | |
| await client.query("SET LOCAL lock_timeout = '5s'"); | |
| await client.query("SET LOCAL idle_in_transaction_session_timeout = '60s'"); | |
| } | |
| try { | |
| if (MODE === "backfill") { | |
| const stats = await backfillProject(org, project, writer); | |
| totalLinksProcessed += stats.kept; | |
| if (!DRY_RUN && writer.flush) await writer.flush(); | |
| if (!DRY_RUN) await client.query("COMMIT"); | |
| console.log(`[backfill] ${org} / ${project}: total=${stats.total} kept=${stats.kept} dropped=${stats.dropped}`); | |
| } else if (MODE === "incremental") { | |
| const since = last_ingest | |
| ? new Date(last_ingest) | |
| : new Date(Date.now() - FIRST_RUN_LOOKBACK_DAYS * 86400 * 1000); | |
| const { scanned, links } = await incrementalProject(org, project, since.toISOString(), writer); | |
| totalLinksProcessed += links; | |
| if (!DRY_RUN && writer.flush) await writer.flush(); | |
| if (!DRY_RUN) await client.query("COMMIT"); | |
| console.log(`[incremental] ${org} / ${project}: scanned=${scanned} upserted=${links}`); | |
| } else { | |
| throw new Error(`Unknown MODE: ${MODE}`); | |
| } | |
| } catch (e) { | |
| if (!DRY_RUN) { try { await client.query("ROLLBACK"); } catch {} } | |
| throw e; | |
| } | |
| await sleep(SLEEP_BETWEEN_PROJECTS_MS); | |
| } | |
| if (DRY_RUN) { | |
| const header = ["-- Dry run output", "BEGIN;"].join("\n"); | |
| const footer = ["-- Cursor not advanced in dry run", "COMMIT;"].join("\n"); | |
| const file = path.resolve(process.cwd(), OUT_FILE); | |
| fs.writeFileSync(file, header + "\n" + sqlLines.join("\n") + "\n" + footer + "\n", "utf8"); | |
| console.log(`Dry run file: ${file}`); | |
| } | |
| } catch (err) { | |
| console.error(err.message || String(err)); | |
| process.exitCode = 1; | |
| } finally { | |
| await client.end(); | |
| const ended = new Date(); | |
| console.log(`HTTP calls: ${httpCalls}`); | |
| console.log(`Run time: ${Math.round((ended - started) / 1000)}s`); | |
| console.log(`Total links processed: ${totalLinksProcessed}.`); | |
| } | |
| } | |
| main(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment