Skip to content

Instantly share code, notes, and snippets.

@Sdy603
Last active September 26, 2025 19:38
Show Gist options
  • Select an option

  • Save Sdy603/6f50a63bb3021de4291392cc49339289 to your computer and use it in GitHub Desktop.

Select an option

Save Sdy603/6f50a63bb3021de4291392cc49339289 to your computer and use it in GitHub Desktop.
/**
* fetch_work_item_parents.js
*
* Backfill: WorkItemLinks WIQL, partitioned by Source.ChangedDate windows using DATE precision (YYYY-MM-DD)
* Incremental: WorkItems WIQL by ChangedDate with DATE precision filter (YYYY-MM-DD)
* Writes: batched multi row upserts into custom.ado_work_item_links
* Schema columns used: child_work_item_source_id, parent_work_item_source_id, relation_url
* MIN_ID applied to both parent and child
* Direct ADO HTTPS without DX proxy
* Dry run writes a SQL file
*/
const { Client } = require("pg");
const fs = require("fs");
const path = require("path");
const axios = require("axios");
// Required env
const { DX_PG_URL, ADO_PAT } = process.env;
if (!DX_PG_URL || !ADO_PAT) {
console.error("Set DX_PG_URL and ADO_PAT");
process.exit(1);
}
// Tunables
const MODE = (process.env.MODE || "incremental").toLowerCase();
const DRY_RUN = process.env.DRY_RUN === "1";
const OUT_FILE = process.env.OUT_FILE || "ado_links_inserts.sql";
const BATCH_SIZE = parseInt(process.env.BATCH_SIZE || "200", 10); // fetch chunk size for workitemsbatch
const FIRST_RUN_LOOKBACK_DAYS = parseInt(process.env.FIRST_RUN_LOOKBACK_DAYS || "3650", 10);
const MIN_ID = parseInt(process.env.MIN_ID || "11339", 10);
// Write batching
const WRITE_BUFFER_SIZE = parseInt(process.env.WRITE_BUFFER_SIZE || "200", 10); // rows per upsert statement
const WRITE_FLUSH_MS = parseInt(process.env.WRITE_FLUSH_MS || "250", 10); // timer based flush
// Pacing
const SLEEP_PER_2000_LINKS_MS = parseInt(process.env.SLEEP_PER_2000_LINKS_MS || "400", 10);
const SLEEP_BETWEEN_BATCHES_MS = parseInt(process.env.SLEEP_BETWEEN_BATCHES_MS || "300", 10);
const SLEEP_BETWEEN_PROJECTS_MS = parseInt(process.env.SLEEP_BETWEEN_PROJECTS_MS || "500", 10);
// Normalize DX connection string
const normalizePostgresURL = (url) =>
url.startsWith("postgres://") ? url.replace("postgres://", "postgresql://") : url;
// Axios wrapper with retry and WIQL cap handling
let httpCalls = 0;
async function httpJson(url, { method = "GET", headers = {}, body = undefined }, attempt = 1) {
httpCalls += 1;
try {
const res = await axios({
url,
method,
headers,
data: body,
timeout: 30000,
validateStatus: () => true
});
if (res.status >= 200 && res.status < 300) return res.data;
if (res.status === 400 && /WorkItemTrackingQueryResultSizeLimitExceeded/i.test(JSON.stringify(res.data || {}))) {
const e = new Error("WIQL_LINKS_RESULT_CAP");
e.code = "WIQL_LINKS_RESULT_CAP";
throw e;
}
if ((res.status === 429 || (res.status >= 500 && res.status < 600)) && attempt < 6) {
const ra = parseInt(res.headers["retry-after"] || "0", 10);
const ms = ra > 0 ? ra * 1000 : attempt * 1000;
await new Promise(r => setTimeout(r, ms));
return httpJson(url, { method, headers, body }, attempt + 1);
}
throw new Error(`HTTP ${res.status}: ${res.statusText} | ${typeof res.data === "string" ? res.data : JSON.stringify(res.data)}`);
} catch (err) {
if (err && err.code === "WIQL_LINKS_RESULT_CAP") throw err;
if (attempt < 6) {
const ms = attempt * 1000;
await new Promise(r => setTimeout(r, ms));
return httpJson(url, { method, headers, body }, attempt + 1);
}
throw err;
}
}
const AUTH_HEADER = { Authorization: "Basic " + Buffer.from(":" + ADO_PAT).toString("base64") };
// Projects query
const ONE_READ_SQL = `
SELECT
ao.name AS organization_name,
ap.name AS project_name,
MAX(l.row_created_at) AS last_ingest
FROM ado_projects ap
JOIN ado_organizations ao
ON ap.organization_id::bigint = ao.id
LEFT JOIN custom.ado_work_item_links l
ON true
WHERE ap.api_accessible = TRUE
AND ap.allowlisted = TRUE
AND ao.api_accessible = TRUE
GROUP BY ao.name, ap.name
ORDER BY ao.name, ap.name;
`;
// Helpers
function escapeSql(s) { return s.replace(/'/g, "''"); }
function parentIdFromUrl(url) { const n = parseInt(url.split("/").pop(), 10); return Number.isFinite(n) ? n : null; }
function chunk(arr, n) { const out = []; for (let i = 0; i < arr.length; i += n) out.push(arr.slice(i, i + n)); return out; }
async function sleep(ms) { return new Promise(r => setTimeout(r, ms)); }
function toWiqlDate(d) {
const y = d.getUTCFullYear();
const m = String(d.getUTCMonth() + 1).padStart(2, "0");
const day = String(d.getUTCDate()).padStart(2, "0");
return `${y}-${m}-${day}`;
}
// Build one row insert for dry run output file
function insertSql(childId, parentId, relUrl) {
const urlSql = relUrl ? `'${escapeSql(relUrl)}'` : "NULL";
return (
"INSERT INTO custom.ado_work_item_links " +
"(child_work_item_source_id, parent_work_item_source_id, relation_url) VALUES " +
`(${childId}, ${parentId}, ${urlSql}) ` +
"ON CONFLICT (child_work_item_source_id, parent_work_item_source_id) " +
"DO UPDATE SET relation_url = EXCLUDED.relation_url;"
);
}
// Buffered writer for live mode with multi row upserts
function buildWriter(DRY_RUN, client, sqlLines) {
if (DRY_RUN) {
const writer = async (childId, parentId, relUrl) => {
sqlLines.push(insertSql(childId, parentId, relUrl));
};
writer.flush = async () => {};
return writer;
}
const buf = [];
let flushTimer = null;
async function flush() {
if (buf.length === 0) return;
const rows = buf.splice(0, buf.length);
const values = [];
const params = [];
let p = 1;
for (const r of rows) {
values.push(`($${p++}, $${p++}, $${p++})`);
params.push(r.childId, r.parentId, r.relUrl);
}
const sql = `
INSERT INTO custom.ado_work_item_links
(child_work_item_source_id, parent_work_item_source_id, relation_url)
VALUES ${values.join(",")}
ON CONFLICT (child_work_item_source_id, parent_work_item_source_id)
DO UPDATE SET relation_url = EXCLUDED.relation_url
`;
await client.query(sql, params);
await sleep(100);
}
async function scheduleFlush() {
if (flushTimer) return;
flushTimer = setTimeout(async () => {
flushTimer = null;
try { await flush(); } catch (e) { console.error(e.message || String(e)); }
}, WRITE_FLUSH_MS);
}
const writer = async (childId, parentId, relUrl) => {
buf.push({ childId, parentId, relUrl });
if (buf.length >= WRITE_BUFFER_SIZE) {
await flush();
} else {
await scheduleFlush();
}
};
writer.flush = flush;
return writer;
}
/**
* Backfill using WorkItemLinks with date only windows.
* Returns { total, kept, dropped } for metrics.
*/
async function backfillProject(org, project, writeFn) {
const base = `https://dev.azure.com/${encodeURIComponent(org)}`;
async function runLinksWiql(start, end) {
const wiql = `
SELECT [System.Id]
FROM WorkItemLinks
WHERE
[Source].[System.TeamProject] = '${escapeSql(project)}'
AND [Source].[System.ChangedDate] >= '${toWiqlDate(start)}'
AND [Source].[System.ChangedDate] < '${toWiqlDate(end)}'
AND [System.Links.LinkType] = 'System.LinkTypes.Hierarchy-Forward'
MODE(Recursive)
`;
const url = `${base}/${encodeURIComponent(project)}/_apis/wit/wiql?api-version=7.1-preview.2`;
const data = await httpJson(url, { method: "POST", headers: { ...AUTH_HEADER, "Content-Type": "application/json" }, body: JSON.stringify({ query: wiql }) });
return data.workItemRelations || [];
}
let total = 0, kept = 0, dropped = 0;
const earliest = new Date("2005-01-01T00:00:00Z");
const now = new Date();
for (let d = new Date(earliest); d < now; ) {
const end = new Date(d);
end.setUTCDate(end.getUTCDate() + 180);
if (end > now) end.setTime(now.getTime());
let rels;
try {
rels = await runLinksWiql(d, end);
} catch (e) {
if (e && e.code === "WIQL_LINKS_RESULT_CAP") {
const mid = new Date((d.getTime() + end.getTime()) / 2);
const a = await backfillProjectWindow(org, project, writeFn, d, mid);
const b = await backfillProjectWindow(org, project, writeFn, mid, end);
total += a.total + b.total;
kept += a.kept + b.kept;
dropped += a.dropped + b.dropped;
d = end;
continue;
}
throw e;
}
for (const r of rels) {
if (!r.source || !r.target) continue;
total++;
const parentId = r.source.id;
const childId = r.target.id;
if (!Number.isFinite(parentId) || !Number.isFinite(childId)) { dropped++; continue; }
if (childId < MIN_ID || parentId < MIN_ID) { dropped++; continue; }
const relUrl = `${base}/_apis/wit/workItems/${parentId}`;
await writeFn(childId, parentId, relUrl);
kept++;
if (kept % 2000 === 0) await sleep(SLEEP_PER_2000_LINKS_MS);
}
d = end;
}
return { total, kept, dropped };
// helper for split windows
async function backfillProjectWindow(org, project, writeFn, start, end) {
const base2 = `https://dev.azure.com/${encodeURIComponent(org)}`;
const url = `${base2}/${encodeURIComponent(project)}/_apis/wit/wiql?api-version=7.1-preview.2`;
const wiql = `
SELECT [System.Id]
FROM WorkItemLinks
WHERE
[Source].[System.TeamProject] = '${escapeSql(project)}'
AND [Source].[System.ChangedDate] >= '${toWiqlDate(start)}'
AND [Source].[System.ChangedDate] < '${toWiqlDate(end)}'
AND [System.Links.LinkType] = 'System.LinkTypes.Hierarchy-Forward'
MODE(Recursive)
`;
try {
const data = await httpJson(url, { method: "POST", headers: { ...AUTH_HEADER, "Content-Type": "application/json" }, body: JSON.stringify({ query: wiql }) });
const rels = data.workItemRelations || [];
let t = 0, k = 0, dr = 0;
for (const r of rels) {
if (!r.source || !r.target) continue;
t++;
const parentId = r.source.id;
const childId = r.target.id;
if (!Number.isFinite(parentId) || !Number.isFinite(childId)) { dr++; continue; }
if (childId < MIN_ID || parentId < MIN_ID) { dr++; continue; }
const relUrl = `${base2}/_apis/wit/workItems/${parentId}`;
await writeFn(childId, parentId, relUrl);
k++;
if (k % 2000 === 0) await sleep(SLEEP_PER_2000_LINKS_MS);
}
return { total: t, kept: k, dropped: dr };
} catch (e) {
if (e && e.code === "WIQL_LINKS_RESULT_CAP") {
const mid = new Date((start.getTime() + end.getTime()) / 2);
const left = await backfillProjectWindow(org, project, writeFn, start, mid);
const right = await backfillProjectWindow(org, project, writeFn, mid, end);
return { total: left.total + right.total, kept: left.kept + right.kept, dropped: left.dropped + right.dropped };
}
throw e;
}
}
}
// Incremental using ChangedDate WIQL and workitemsbatch expand=Relations
async function incrementalProject(org, project, sinceISO, writeFn) {
const base = `https://dev.azure.com/${encodeURIComponent(org)}`;
// WIQL date precision requires YYYY-MM-DD without time
const sinceDate = new Date(sinceISO);
const sinceWiqlDate = toWiqlDate(sinceDate);
const wiql = `
SELECT [System.Id]
FROM WorkItems
WHERE [System.TeamProject] = '${escapeSql(project)}'
AND [System.ChangedDate] >= '${sinceWiqlDate}'
ORDER BY [System.ChangedDate] ASC
`;
const idsRes = await httpJson(
`${base}/${encodeURIComponent(project)}/_apis/wit/wiql?api-version=7.1-preview.2`,
{ method: "POST", headers: { ...AUTH_HEADER, "Content-Type": "application/json" }, body: JSON.stringify({ query: wiql }) }
);
const ids = (idsRes.workItems || []).map(w => w.id).filter(Number.isFinite);
if (ids.length === 0) return { scanned: 0, links: 0 };
let links = 0;
for (const slice of chunk(ids, BATCH_SIZE)) {
const batch = await httpJson(
`${base}/_apis/wit/workitemsbatch?api-version=7.1`,
{ method: "POST", headers: { ...AUTH_HEADER, "Content-Type": "application/json" }, body: JSON.stringify({ ids: slice, fields: ["System.Id"], expand: "Relations" }) }
);
for (const wi of (batch.value || [])) {
const childId = wi.id;
for (const rel of (wi.relations || [])) {
if (rel.rel !== "System.LinkTypes.Hierarchy-Reverse" || !rel.url) continue;
const parentId = parentIdFromUrl(rel.url);
if (!parentId) continue;
if (childId < MIN_ID || parentId < MIN_ID) continue;
const relUrl = `${base}/_apis/wit/workItems/${parentId}`;
await writeFn(childId, parentId, relUrl);
links += 1;
}
}
await sleep(SLEEP_BETWEEN_BATCHES_MS);
}
return { scanned: ids.length, links };
}
async function main() {
const started = new Date();
const client = new Client({ connectionString: normalizePostgresURL(DX_PG_URL), ssl: { rejectUnauthorized: false } });
await client.connect();
const { rows: scopes } = await client.query(ONE_READ_SQL);
if (!scopes.length) {
console.log("No API accessible and allowlisted projects found");
await client.end();
return;
}
const sqlLines = [];
const writer = buildWriter(DRY_RUN, client, sqlLines);
let totalLinksProcessed = 0;
try {
for (const { organization_name: org, project_name: project, last_ingest } of scopes) {
if (!DRY_RUN) {
await client.query("BEGIN");
await client.query("SET LOCAL statement_timeout = '15s'");
await client.query("SET LOCAL lock_timeout = '5s'");
await client.query("SET LOCAL idle_in_transaction_session_timeout = '60s'");
}
try {
if (MODE === "backfill") {
const stats = await backfillProject(org, project, writer);
totalLinksProcessed += stats.kept;
if (!DRY_RUN && writer.flush) await writer.flush();
if (!DRY_RUN) await client.query("COMMIT");
console.log(`[backfill] ${org} / ${project}: total=${stats.total} kept=${stats.kept} dropped=${stats.dropped}`);
} else if (MODE === "incremental") {
const since = last_ingest
? new Date(last_ingest)
: new Date(Date.now() - FIRST_RUN_LOOKBACK_DAYS * 86400 * 1000);
const { scanned, links } = await incrementalProject(org, project, since.toISOString(), writer);
totalLinksProcessed += links;
if (!DRY_RUN && writer.flush) await writer.flush();
if (!DRY_RUN) await client.query("COMMIT");
console.log(`[incremental] ${org} / ${project}: scanned=${scanned} upserted=${links}`);
} else {
throw new Error(`Unknown MODE: ${MODE}`);
}
} catch (e) {
if (!DRY_RUN) { try { await client.query("ROLLBACK"); } catch {} }
throw e;
}
await sleep(SLEEP_BETWEEN_PROJECTS_MS);
}
if (DRY_RUN) {
const header = ["-- Dry run output", "BEGIN;"].join("\n");
const footer = ["-- Cursor not advanced in dry run", "COMMIT;"].join("\n");
const file = path.resolve(process.cwd(), OUT_FILE);
fs.writeFileSync(file, header + "\n" + sqlLines.join("\n") + "\n" + footer + "\n", "utf8");
console.log(`Dry run file: ${file}`);
}
} catch (err) {
console.error(err.message || String(err));
process.exitCode = 1;
} finally {
await client.end();
const ended = new Date();
console.log(`HTTP calls: ${httpCalls}`);
console.log(`Run time: ${Math.round((ended - started) / 1000)}s`);
console.log(`Total links processed: ${totalLinksProcessed}.`);
}
}
main();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment