Skip to content

Instantly share code, notes, and snippets.

@LogIN-
Created February 20, 2025 14:48
Show Gist options
  • Select an option

  • Save LogIN-/688d4280a379d0d608f10cf2a1efc964 to your computer and use it in GitHub Desktop.

Select an option

Save LogIN-/688d4280a379d0d608f10cf2a1efc964 to your computer and use it in GitHub Desktop.
NodeJS Module for Partial / Chunked reads of large JSON and XML files
/**
* Chunked/Partial Readers for Large JSON, XML, and Text Files
* ----------------------------------------------------------
* This module provides functions for efficiently reading and processing
* very large files (up to hundreds of gigabytes) in a streaming manner,
* preventing memory overload. It supports JSON, XML, and text formats.
*
* Usage:
* 1. `xml(parser, callback)`: Stream-process XML files line by line.
* 2. `text(parser, callback)`: Stream-process plain text files line by line.
* 3. `json(parser, callback)`: Stream-process JSON files using `stream-json`.
*
* Each function:
* - Validates file existence.
* - Reads file content in chunks.
* - Passes each line/entry to a custom processor (`parser.proccesor.processLine`).
* - Displays periodic statistics and manages back-pressure (pauses/resumes reading).
* - Calls a final `parser.proccesor.processingEnd` after all data is processed.
*
* The functions rely on:
* - `fs.createReadStream` for streaming file content.
* - `stream-json` for parsing large JSON files in a memory-friendly way.
* - A `proccesor` object (provided via `parser.proccesor`) responsible for:
* - `processLine(data, cb)`: Insert or handle each processed line or object.
* - `processingEnd(cb)`: Execute any cleanup or final DB operations.
* - `createIndexies(cb)`: (Optional) Create DB indexes after inserts.
* - `itemCounter`: Count of items fully processed (for back-pressure logic).
*
* Dependencies:
* - fs (built-in)
* - cli-color (npm install cli-color)
* - stream-json (npm install stream-json)
*
* Exports:
* - xml, text, json, mils (helper)
*
* Example usage pattern (pseudo-code):
* ----------------------------------------------------------------
* const { xml, text, json } = require('./streamReaders');
*
* // parser.proccesor is a custom object that must have
* // processLine(), processingEnd(), itemCounter, etc.
* const parser = {
* file: 'path/to/large.xml', // large.json or large.txt for the others
* name: 'proteinatlas', // optional (affects whitespace trimming in XML)
* counter: { lines: 0 }, // line counter
* proccesor: { ... }, // your custom processing object
* time: { start: Date.now() },
* objectlimit: false,
* };
*
* xml(parser, () => {
* console.log('XML processing complete!');
* });
* ----------------------------------------------------------------
*/
const fs = require("fs");
const clc = require("cli-color");
// Stream-JSON dependencies for large JSON streaming
const { parser: jsonParser } = require("stream-json");
const { chain } = require("stream-chain");
const { streamObject } = require("stream-json/streamers/StreamObject");
// Basic console log colors
let error = clc.red.bold,
warn = clc.yellow,
notice = clc.blue;
/**
* Stream-process a large XML file. Reads the file line by line and
* delegates each <entry>...</entry> block to `parser.proccesor.processLine()`.
*/
const xml = function (parser, callback) {
let self = this;
if (!fs.existsSync(parser.file)) {
console.log(warn("\r\nERROR: ") + notice("File: " + parser.file + " doesn't exist, skipping!\r\n"));
return callback();
}
// Collect all relevant variables in an object for easier cleanup
this.o = {
parser: parser,
_lines: [],
_lineFragment: "",
totalBytes: 0,
currentMB: 0,
insertedDocuments: 0,
stats: fs.statSync(parser.file),
diff: 0,
perInsert: parser.proccesor.options.perInsert,
// Document node delimiters
nodes: {
start: "<entry",
end: "</entry>",
},
temp: "", // Holds the text for the current XML entry
inProcess: null,
};
const readable = fs.createReadStream(parser.file, {
flags: "r",
encoding: "utf8",
mode: 0o644,
highWaterMark: 1024 * 1024,
bufferSize: 1024 * 1024,
autoClose: true,
});
// Optionally trim whitespace if parser.name is "proteinatlas"
this.o.trimWhitespace = (this.o.parser.name === "proteinatlas");
readable.on("data", function (chunk) {
let o = self.o;
o.totalBytes += chunk.length;
// Split chunk by line endings
o._lines = o._lines.concat(chunk.toString("utf-8").split(/(?:\n|\r\n|\r)/g));
// Reattach broken line fragments
o._lines[0] = o._lineFragment + o._lines[0];
o._lineFragment = o._lines.pop() || "";
while (o._lines.length !== 0) {
let line = o._lines.shift();
if (o.trimWhitespace) {
line = line.trimStart();
}
// Detect node start / end
if (line.substring(0, o.nodes.start.length) === o.nodes.start) {
o.inProcess = true;
} else if (line.substring(0, o.nodes.end.length) === o.nodes.end) {
o.inProcess = false;
}
// When we've reached the end of a node, process it
if (o.inProcess === false && o.temp !== "") {
o.temp += line;
o.parser.proccesor.processLine(o.temp, function (res) {
o.insertedDocuments += res.count || 0;
});
o.temp = "";
}
// Otherwise accumulate lines within the XML node
else if (o.inProcess === true) {
o.temp += line;
}
// Stats: increment line count
parser.counter.lines++;
// Periodic stats display
if (o.parser.counter.lines % (o.perInsert * 2) === 0) {
o.type = 1;
stats(o);
}
// Back-pressure check: Pause if difference is too large
o.diff = parseInt(o.parser.proccesor.itemCounter - o.insertedDocuments);
if (o.diff >= o.perInsert * 3) {
readable.pause();
const interval = setInterval(function () {
o.diff = parseInt(o.parser.proccesor.itemCounter - o.insertedDocuments);
o.type = 2;
stats(o);
if (o.diff <= o.perInsert) {
clearInterval(interval);
readable.resume();
}
}, 1000);
break;
}
// Limit processing if objectlimit is set
if (o.parser.objectlimit !== false) {
if (o.insertedDocuments >= o.parser.objectlimit) {
readable.destroy();
readable.end(null);
}
}
}
});
// When file reading is done, wait for final inserts and run cleanup
readable.on("end", function () {
let o = self.o;
const interval = setInterval(function () {
o.diff = parseInt(o.parser.proccesor.itemCounter - o.insertedDocuments);
o.type = 3;
stats(o);
if (o.diff <= o.perInsert) {
clearInterval(interval);
o.parser.proccesor.processingEnd(function (res) {
o.insertedDocuments += res.count;
o.diff = parseInt(o.parser.proccesor.itemCounter - o.insertedDocuments);
// Create indexes if required
const total = parseInt(o.insertedDocuments);
let index = false;
if (typeof o.parser.proccesor.createIndexies === "function") {
console.log(warn("\r\nCreating indexes! This can take a while..."));
index = true;
o.parser.proccesor.createIndexies(function () {
index = false;
});
}
const iv = setInterval(function () {
if (index === false) {
clearInterval(iv);
// Time stats
const milliseconds = new Date().getTime() - o.parser.time.start;
const timePassed = parseInt(millisToMin(milliseconds));
console.log(warn("\r\nDocuments : ") + notice(total));
console.log(warn("Differenc : ") + notice(o.diff));
console.log(warn("----Lines : ") + notice(o.parser.counter.lines));
console.log(warn("-----Time : ") + error(timePassed));
return callback();
}
}, 500);
});
}
}, 1500);
});
};
/**
* Stream-process a large plain-text file. Reads the file line by line
* and delegates each line to `parser.proccesor.processLine()`.
*/
const text = function (parser, callback) {
let self = this;
if (!fs.existsSync(parser.file)) {
console.log(warn("\r\nERROR : ") + notice("File: " + parser.file + " doesn't exist, skipping!\r\n"));
return callback();
}
this.o = {
parser: parser,
_lines: [],
_lineFragment: "",
totalBytes: 0,
currentMB: 0,
insertedDocuments: 0,
stats: fs.statSync(parser.file),
diff: 0,
perInsert: parser.proccesor.options.perInsert,
};
const readable = fs.createReadStream(parser.file, {
flags: "r",
encoding: "utf8",
mode: 0o644,
highWaterMark: 1024 * 1024,
bufferSize: 1024 * 1024,
autoClose: true,
});
readable.on("data", function (chunk) {
let o = self.o;
o.totalBytes += chunk.length;
// Split by newlines
o._lines = o._lines.concat(chunk.toString("utf-8").split(/(?:\n|\r\n|\r)/g));
// Reattach broken lines
o._lines[0] = o._lineFragment + o._lines[0];
o._lineFragment = o._lines.pop() || "";
while (o._lines.length !== 0) {
const line = o._lines.shift();
o.parser.counter.lines++;
// Optional offset: If startfromMB is set, skip lines until that offset
if (parser.startfromMB === false || o.currentMB >= parser.startfromMB) {
o.parser.proccesor.processLine(line, function (res) {
o.insertedDocuments += res.count || 0;
});
}
// Periodic stats
if (o.parser.counter.lines % (o.perInsert / 100) === 0) {
o.type = 1;
stats(o);
}
// Back-pressure check
o.diff = parseInt(o.parser.proccesor.itemCounter - o.insertedDocuments);
if (o.diff >= o.perInsert * 3) {
readable.pause();
const interval = setInterval(function () {
o.diff = parseInt(o.parser.proccesor.itemCounter - o.insertedDocuments);
o.type = 2;
stats(o);
if (o.diff <= o.perInsert) {
clearInterval(interval);
readable.resume();
}
}, 1000);
break;
}
// Objectlimit check
if (o.parser.objectlimit !== false) {
if (o.insertedDocuments >= o.parser.objectlimit) {
readable.destroy();
readable.end(null);
}
}
}
});
// When file reading finishes, finalize insertion
readable.on("end", function () {
let o = self.o;
const interval = setInterval(function () {
o.diff = parseInt(o.parser.proccesor.itemCounter - o.insertedDocuments);
o.type = 3;
stats(o);
if (o.diff <= o.perInsert) {
clearInterval(interval);
console.log(warn("\r\nParser: processingEnd start.."));
o.parser.proccesor.processingEnd(function (res) {
console.log(warn("Parser: processingEnd Finished!"));
o.insertedDocuments += res.count;
o.diff = parseInt(o.parser.proccesor.itemCounter - o.insertedDocuments);
// Index creation if available
const total = parseInt(o.insertedDocuments);
let index = false;
if (typeof o.parser.proccesor.createIndexies === "function") {
console.log(warn("\r\nCreating indexes! This can take a while..."));
index = true;
o.parser.proccesor.createIndexies(function () {
index = false;
});
}
const iv = setInterval(function () {
if (index === false) {
clearInterval(iv);
const milliseconds = new Date().getTime() - o.parser.time.start;
const timePassed = parseInt(millisToMin(milliseconds));
console.log(warn("\r\nDocuments : ") + notice(total));
console.log(warn("Differenc : ") + notice(o.diff));
console.log(warn("----Lines : ") + notice(o.parser.counter.lines));
console.log(warn("-----Time : ") + error(timePassed));
return callback();
}
}, 500);
});
}
}, 1500);
});
};
/**
* Stream-process a large JSON file. Uses `stream-json` and `streamObject`
* to parse the JSON in small chunks, calling `parser.proccesor.processLine()`
* for each processed object.
*/
const json = function (parser, callback) {
let self = this;
if (!fs.existsSync(parser.file)) {
console.log(warn("\r\nERROR: ") + notice("File: " + parser.file + " doesn't exist, skipping!\r\n"));
return callback();
}
this.o = {
parser: parser,
buffer: "",
totalBytes: 0,
insertedDocuments: 0,
stats: fs.statSync(parser.file),
diff: 0,
perInsert: parser.proccesor.options.perInsert,
};
// stream-json chain for object-mode streaming
const readable = chain([
fs.createReadStream(parser.file),
jsonParser(),
streamObject()
]);
readable.on("data", (data) => {
let o = self.o;
o.totalBytes += JSON.stringify(data).length;
o.parser.counter.lines++;
o.parser.proccesor.processLine(data, (res) => {
o.insertedDocuments += parseInt(res.count) || 0;
});
// Periodic stats
if (o.parser.counter.lines % (o.perInsert / 100) === 0) {
o.type = 1;
stats(o);
}
// Back-pressure check
o.diff = parseInt(o.parser.proccesor.itemCounter - o.insertedDocuments);
if (o.diff >= o.perInsert * 3) {
readable.pause();
const interval = setInterval(function () {
o.diff = parseInt(o.parser.proccesor.itemCounter - o.insertedDocuments);
o.type = 2;
stats(o);
if (o.diff <= o.perInsert) {
clearInterval(interval);
readable.resume();
}
}, 1000);
}
// Limit objects if objectlimit is set
if (o.parser.objectlimit !== false) {
if (o.insertedDocuments >= o.parser.objectlimit) {
readable.destroy();
readable.end(null);
}
}
});
// Finish and run final processing
readable.on("end", () => {
let o = self.o;
const interval = setInterval(function () {
o.diff = parseInt(o.parser.proccesor.itemCounter - o.insertedDocuments);
o.type = 3;
stats(o);
if (o.diff <= o.perInsert) {
clearInterval(interval);
o.parser.proccesor.processingEnd(function (res) {
o.insertedDocuments += res.count;
o.diff = parseInt(o.parser.proccesor.itemCounter - o.insertedDocuments);
// Create indexes if needed
const total = parseInt(o.insertedDocuments);
let index = false;
if (typeof o.parser.proccesor.createIndexies === "function") {
console.log(warn("\r\nCreating indexes! This can take a while..."));
index = true;
o.parser.proccesor.createIndexies(function () {
index = false;
});
}
const iv = setInterval(function () {
if (index === false) {
clearInterval(iv);
const milliseconds = new Date().getTime() - o.parser.time.start;
const timePassed = parseInt(millisToMin(milliseconds));
console.log(warn("\r\nDocuments : ") + notice(total));
console.log(warn("Differenc : ") + notice(o.diff));
console.log(warn("----Lines : ") + notice(o.parser.counter.lines));
console.log(warn("-----Time : ") + error(timePassed));
return callback();
}
}, 500);
});
}
}, 1500);
});
readable.on("error", (err) => {
console.error("An error occurred while processing the JSON file: " + parser.file);
console.log(err);
console.log(this.o.insertedDocuments);
});
};
/**
* Helper function to display streaming stats in the console.
* Called periodically or on certain events (e.g., back-pressure).
*/
const stats = function (o) {
o.currentMB = (o.totalBytes / 1000000).toFixed(2);
const remaningMB = parseInt(o.stats["size"] / 1000000 - o.currentMB).toFixed(2);
const currentTime = new Date().getTime();
const milliseconds = parseInt(currentTime - o.parser.time.start);
const seconds = parseInt((milliseconds % 60000) / 1000).toFixed(0);
const passed = parseInt(millisToMin(milliseconds));
const eta = Math.round((remaningMB / o.currentMB) * passed);
const inserts = Math.round(o.insertedDocuments / o.perInsert);
process.stdout.clearLine();
process.stdout.cursorTo(0);
process.stdout.write(
warn("Type: ") + error(o.type) +
warn(" Time Passed (min): ") + notice(passed) +
" " + warn(" (sec): ") + error(seconds) +
warn(" Line: ") + notice(o.parser.counter.lines) +
warn(" Inserted: ") + notice(o.insertedDocuments) +
warn(" Current (MB): ") + notice(o.currentMB) +
" " + warn(" Remaining (MB): ") + notice(remaningMB) + " MB" +
warn(" ETA (min): ") + notice(eta) + " " +
warn(" Diff (process/inserted): ") + error(o.diff) +
" " + warn(" Per/Insert: ") + error(o.perInsert) +
" " + warn(" Inserts (total): ") + error(inserts) + "\r"
);
};
/**
* Converts milliseconds to integer minutes.
*/
const millisToMin = function (millis) {
return parseInt(Math.floor(millis / 60000));
};
/**
* (Unused example) A helper function to track state if you want
* to parse JSON lines manually. Not critical to main usage.
*/
function updateJSONState(state, line) {
if (state === null && line.trim().startsWith("{")) {
return "start";
} else if (state === "start" && line.trim().endsWith("}")) {
return "end";
}
return state;
}
module.exports = {
xml: xml,
text: text,
json: json,
mils: millisToMin,
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment