Created
February 20, 2025 14:48
-
-
Save LogIN-/688d4280a379d0d608f10cf2a1efc964 to your computer and use it in GitHub Desktop.
NodeJS Module for Partial / Chunked reads of large JSON and XML files
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /** | |
| * Chunked/Partial Readers for Large JSON, XML, and Text Files | |
| * ---------------------------------------------------------- | |
| * This module provides functions for efficiently reading and processing | |
| * very large files (up to hundreds of gigabytes) in a streaming manner, | |
| * preventing memory overload. It supports JSON, XML, and text formats. | |
| * | |
| * Usage: | |
| * 1. `xml(parser, callback)`: Stream-process XML files line by line. | |
| * 2. `text(parser, callback)`: Stream-process plain text files line by line. | |
| * 3. `json(parser, callback)`: Stream-process JSON files using `stream-json`. | |
| * | |
| * Each function: | |
| * - Validates file existence. | |
| * - Reads file content in chunks. | |
| * - Passes each line/entry to a custom processor (`parser.proccesor.processLine`). | |
| * - Displays periodic statistics and manages back-pressure (pauses/resumes reading). | |
| * - Calls a final `parser.proccesor.processingEnd` after all data is processed. | |
| * | |
| * The functions rely on: | |
| * - `fs.createReadStream` for streaming file content. | |
| * - `stream-json` for parsing large JSON files in a memory-friendly way. | |
| * - A `proccesor` object (provided via `parser.proccesor`) responsible for: | |
| * - `processLine(data, cb)`: Insert or handle each processed line or object. | |
| * - `processingEnd(cb)`: Execute any cleanup or final DB operations. | |
| * - `createIndexies(cb)`: (Optional) Create DB indexes after inserts. | |
| * - `itemCounter`: Count of items fully processed (for back-pressure logic). | |
| * | |
| * Dependencies: | |
| * - fs (built-in) | |
| * - cli-color (npm install cli-color) | |
| * - stream-json (npm install stream-json) | |
| * | |
| * Exports: | |
| * - xml, text, json, mils (helper) | |
| * | |
| * Example usage pattern (pseudo-code): | |
| * ---------------------------------------------------------------- | |
| * const { xml, text, json } = require('./streamReaders'); | |
| * | |
| * // parser.proccesor is a custom object that must have | |
| * // processLine(), processingEnd(), itemCounter, etc. | |
| * const parser = { | |
| * file: 'path/to/large.xml', // large.json or large.txt for the others | |
| * name: 'proteinatlas', // optional (affects whitespace trimming in XML) | |
| * counter: { lines: 0 }, // line counter | |
| * proccesor: { ... }, // your custom processing object | |
| * time: { start: Date.now() }, | |
| * objectlimit: false, | |
| * }; | |
| * | |
| * xml(parser, () => { | |
| * console.log('XML processing complete!'); | |
| * }); | |
| * ---------------------------------------------------------------- | |
| */ | |
| const fs = require("fs"); | |
| const clc = require("cli-color"); | |
| // Stream-JSON dependencies for large JSON streaming | |
| const { parser: jsonParser } = require("stream-json"); | |
| const { chain } = require("stream-chain"); | |
| const { streamObject } = require("stream-json/streamers/StreamObject"); | |
| // Basic console log colors | |
| let error = clc.red.bold, | |
| warn = clc.yellow, | |
| notice = clc.blue; | |
| /** | |
| * Stream-process a large XML file. Reads the file line by line and | |
| * delegates each <entry>...</entry> block to `parser.proccesor.processLine()`. | |
| */ | |
| const xml = function (parser, callback) { | |
| let self = this; | |
| if (!fs.existsSync(parser.file)) { | |
| console.log(warn("\r\nERROR: ") + notice("File: " + parser.file + " doesn't exist, skipping!\r\n")); | |
| return callback(); | |
| } | |
| // Collect all relevant variables in an object for easier cleanup | |
| this.o = { | |
| parser: parser, | |
| _lines: [], | |
| _lineFragment: "", | |
| totalBytes: 0, | |
| currentMB: 0, | |
| insertedDocuments: 0, | |
| stats: fs.statSync(parser.file), | |
| diff: 0, | |
| perInsert: parser.proccesor.options.perInsert, | |
| // Document node delimiters | |
| nodes: { | |
| start: "<entry", | |
| end: "</entry>", | |
| }, | |
| temp: "", // Holds the text for the current XML entry | |
| inProcess: null, | |
| }; | |
| const readable = fs.createReadStream(parser.file, { | |
| flags: "r", | |
| encoding: "utf8", | |
| mode: 0o644, | |
| highWaterMark: 1024 * 1024, | |
| bufferSize: 1024 * 1024, | |
| autoClose: true, | |
| }); | |
| // Optionally trim whitespace if parser.name is "proteinatlas" | |
| this.o.trimWhitespace = (this.o.parser.name === "proteinatlas"); | |
| readable.on("data", function (chunk) { | |
| let o = self.o; | |
| o.totalBytes += chunk.length; | |
| // Split chunk by line endings | |
| o._lines = o._lines.concat(chunk.toString("utf-8").split(/(?:\n|\r\n|\r)/g)); | |
| // Reattach broken line fragments | |
| o._lines[0] = o._lineFragment + o._lines[0]; | |
| o._lineFragment = o._lines.pop() || ""; | |
| while (o._lines.length !== 0) { | |
| let line = o._lines.shift(); | |
| if (o.trimWhitespace) { | |
| line = line.trimStart(); | |
| } | |
| // Detect node start / end | |
| if (line.substring(0, o.nodes.start.length) === o.nodes.start) { | |
| o.inProcess = true; | |
| } else if (line.substring(0, o.nodes.end.length) === o.nodes.end) { | |
| o.inProcess = false; | |
| } | |
| // When we've reached the end of a node, process it | |
| if (o.inProcess === false && o.temp !== "") { | |
| o.temp += line; | |
| o.parser.proccesor.processLine(o.temp, function (res) { | |
| o.insertedDocuments += res.count || 0; | |
| }); | |
| o.temp = ""; | |
| } | |
| // Otherwise accumulate lines within the XML node | |
| else if (o.inProcess === true) { | |
| o.temp += line; | |
| } | |
| // Stats: increment line count | |
| parser.counter.lines++; | |
| // Periodic stats display | |
| if (o.parser.counter.lines % (o.perInsert * 2) === 0) { | |
| o.type = 1; | |
| stats(o); | |
| } | |
| // Back-pressure check: Pause if difference is too large | |
| o.diff = parseInt(o.parser.proccesor.itemCounter - o.insertedDocuments); | |
| if (o.diff >= o.perInsert * 3) { | |
| readable.pause(); | |
| const interval = setInterval(function () { | |
| o.diff = parseInt(o.parser.proccesor.itemCounter - o.insertedDocuments); | |
| o.type = 2; | |
| stats(o); | |
| if (o.diff <= o.perInsert) { | |
| clearInterval(interval); | |
| readable.resume(); | |
| } | |
| }, 1000); | |
| break; | |
| } | |
| // Limit processing if objectlimit is set | |
| if (o.parser.objectlimit !== false) { | |
| if (o.insertedDocuments >= o.parser.objectlimit) { | |
| readable.destroy(); | |
| readable.end(null); | |
| } | |
| } | |
| } | |
| }); | |
| // When file reading is done, wait for final inserts and run cleanup | |
| readable.on("end", function () { | |
| let o = self.o; | |
| const interval = setInterval(function () { | |
| o.diff = parseInt(o.parser.proccesor.itemCounter - o.insertedDocuments); | |
| o.type = 3; | |
| stats(o); | |
| if (o.diff <= o.perInsert) { | |
| clearInterval(interval); | |
| o.parser.proccesor.processingEnd(function (res) { | |
| o.insertedDocuments += res.count; | |
| o.diff = parseInt(o.parser.proccesor.itemCounter - o.insertedDocuments); | |
| // Create indexes if required | |
| const total = parseInt(o.insertedDocuments); | |
| let index = false; | |
| if (typeof o.parser.proccesor.createIndexies === "function") { | |
| console.log(warn("\r\nCreating indexes! This can take a while...")); | |
| index = true; | |
| o.parser.proccesor.createIndexies(function () { | |
| index = false; | |
| }); | |
| } | |
| const iv = setInterval(function () { | |
| if (index === false) { | |
| clearInterval(iv); | |
| // Time stats | |
| const milliseconds = new Date().getTime() - o.parser.time.start; | |
| const timePassed = parseInt(millisToMin(milliseconds)); | |
| console.log(warn("\r\nDocuments : ") + notice(total)); | |
| console.log(warn("Differenc : ") + notice(o.diff)); | |
| console.log(warn("----Lines : ") + notice(o.parser.counter.lines)); | |
| console.log(warn("-----Time : ") + error(timePassed)); | |
| return callback(); | |
| } | |
| }, 500); | |
| }); | |
| } | |
| }, 1500); | |
| }); | |
| }; | |
| /** | |
| * Stream-process a large plain-text file. Reads the file line by line | |
| * and delegates each line to `parser.proccesor.processLine()`. | |
| */ | |
| const text = function (parser, callback) { | |
| let self = this; | |
| if (!fs.existsSync(parser.file)) { | |
| console.log(warn("\r\nERROR : ") + notice("File: " + parser.file + " doesn't exist, skipping!\r\n")); | |
| return callback(); | |
| } | |
| this.o = { | |
| parser: parser, | |
| _lines: [], | |
| _lineFragment: "", | |
| totalBytes: 0, | |
| currentMB: 0, | |
| insertedDocuments: 0, | |
| stats: fs.statSync(parser.file), | |
| diff: 0, | |
| perInsert: parser.proccesor.options.perInsert, | |
| }; | |
| const readable = fs.createReadStream(parser.file, { | |
| flags: "r", | |
| encoding: "utf8", | |
| mode: 0o644, | |
| highWaterMark: 1024 * 1024, | |
| bufferSize: 1024 * 1024, | |
| autoClose: true, | |
| }); | |
| readable.on("data", function (chunk) { | |
| let o = self.o; | |
| o.totalBytes += chunk.length; | |
| // Split by newlines | |
| o._lines = o._lines.concat(chunk.toString("utf-8").split(/(?:\n|\r\n|\r)/g)); | |
| // Reattach broken lines | |
| o._lines[0] = o._lineFragment + o._lines[0]; | |
| o._lineFragment = o._lines.pop() || ""; | |
| while (o._lines.length !== 0) { | |
| const line = o._lines.shift(); | |
| o.parser.counter.lines++; | |
| // Optional offset: If startfromMB is set, skip lines until that offset | |
| if (parser.startfromMB === false || o.currentMB >= parser.startfromMB) { | |
| o.parser.proccesor.processLine(line, function (res) { | |
| o.insertedDocuments += res.count || 0; | |
| }); | |
| } | |
| // Periodic stats | |
| if (o.parser.counter.lines % (o.perInsert / 100) === 0) { | |
| o.type = 1; | |
| stats(o); | |
| } | |
| // Back-pressure check | |
| o.diff = parseInt(o.parser.proccesor.itemCounter - o.insertedDocuments); | |
| if (o.diff >= o.perInsert * 3) { | |
| readable.pause(); | |
| const interval = setInterval(function () { | |
| o.diff = parseInt(o.parser.proccesor.itemCounter - o.insertedDocuments); | |
| o.type = 2; | |
| stats(o); | |
| if (o.diff <= o.perInsert) { | |
| clearInterval(interval); | |
| readable.resume(); | |
| } | |
| }, 1000); | |
| break; | |
| } | |
| // Objectlimit check | |
| if (o.parser.objectlimit !== false) { | |
| if (o.insertedDocuments >= o.parser.objectlimit) { | |
| readable.destroy(); | |
| readable.end(null); | |
| } | |
| } | |
| } | |
| }); | |
| // When file reading finishes, finalize insertion | |
| readable.on("end", function () { | |
| let o = self.o; | |
| const interval = setInterval(function () { | |
| o.diff = parseInt(o.parser.proccesor.itemCounter - o.insertedDocuments); | |
| o.type = 3; | |
| stats(o); | |
| if (o.diff <= o.perInsert) { | |
| clearInterval(interval); | |
| console.log(warn("\r\nParser: processingEnd start..")); | |
| o.parser.proccesor.processingEnd(function (res) { | |
| console.log(warn("Parser: processingEnd Finished!")); | |
| o.insertedDocuments += res.count; | |
| o.diff = parseInt(o.parser.proccesor.itemCounter - o.insertedDocuments); | |
| // Index creation if available | |
| const total = parseInt(o.insertedDocuments); | |
| let index = false; | |
| if (typeof o.parser.proccesor.createIndexies === "function") { | |
| console.log(warn("\r\nCreating indexes! This can take a while...")); | |
| index = true; | |
| o.parser.proccesor.createIndexies(function () { | |
| index = false; | |
| }); | |
| } | |
| const iv = setInterval(function () { | |
| if (index === false) { | |
| clearInterval(iv); | |
| const milliseconds = new Date().getTime() - o.parser.time.start; | |
| const timePassed = parseInt(millisToMin(milliseconds)); | |
| console.log(warn("\r\nDocuments : ") + notice(total)); | |
| console.log(warn("Differenc : ") + notice(o.diff)); | |
| console.log(warn("----Lines : ") + notice(o.parser.counter.lines)); | |
| console.log(warn("-----Time : ") + error(timePassed)); | |
| return callback(); | |
| } | |
| }, 500); | |
| }); | |
| } | |
| }, 1500); | |
| }); | |
| }; | |
| /** | |
| * Stream-process a large JSON file. Uses `stream-json` and `streamObject` | |
| * to parse the JSON in small chunks, calling `parser.proccesor.processLine()` | |
| * for each processed object. | |
| */ | |
| const json = function (parser, callback) { | |
| let self = this; | |
| if (!fs.existsSync(parser.file)) { | |
| console.log(warn("\r\nERROR: ") + notice("File: " + parser.file + " doesn't exist, skipping!\r\n")); | |
| return callback(); | |
| } | |
| this.o = { | |
| parser: parser, | |
| buffer: "", | |
| totalBytes: 0, | |
| insertedDocuments: 0, | |
| stats: fs.statSync(parser.file), | |
| diff: 0, | |
| perInsert: parser.proccesor.options.perInsert, | |
| }; | |
| // stream-json chain for object-mode streaming | |
| const readable = chain([ | |
| fs.createReadStream(parser.file), | |
| jsonParser(), | |
| streamObject() | |
| ]); | |
| readable.on("data", (data) => { | |
| let o = self.o; | |
| o.totalBytes += JSON.stringify(data).length; | |
| o.parser.counter.lines++; | |
| o.parser.proccesor.processLine(data, (res) => { | |
| o.insertedDocuments += parseInt(res.count) || 0; | |
| }); | |
| // Periodic stats | |
| if (o.parser.counter.lines % (o.perInsert / 100) === 0) { | |
| o.type = 1; | |
| stats(o); | |
| } | |
| // Back-pressure check | |
| o.diff = parseInt(o.parser.proccesor.itemCounter - o.insertedDocuments); | |
| if (o.diff >= o.perInsert * 3) { | |
| readable.pause(); | |
| const interval = setInterval(function () { | |
| o.diff = parseInt(o.parser.proccesor.itemCounter - o.insertedDocuments); | |
| o.type = 2; | |
| stats(o); | |
| if (o.diff <= o.perInsert) { | |
| clearInterval(interval); | |
| readable.resume(); | |
| } | |
| }, 1000); | |
| } | |
| // Limit objects if objectlimit is set | |
| if (o.parser.objectlimit !== false) { | |
| if (o.insertedDocuments >= o.parser.objectlimit) { | |
| readable.destroy(); | |
| readable.end(null); | |
| } | |
| } | |
| }); | |
| // Finish and run final processing | |
| readable.on("end", () => { | |
| let o = self.o; | |
| const interval = setInterval(function () { | |
| o.diff = parseInt(o.parser.proccesor.itemCounter - o.insertedDocuments); | |
| o.type = 3; | |
| stats(o); | |
| if (o.diff <= o.perInsert) { | |
| clearInterval(interval); | |
| o.parser.proccesor.processingEnd(function (res) { | |
| o.insertedDocuments += res.count; | |
| o.diff = parseInt(o.parser.proccesor.itemCounter - o.insertedDocuments); | |
| // Create indexes if needed | |
| const total = parseInt(o.insertedDocuments); | |
| let index = false; | |
| if (typeof o.parser.proccesor.createIndexies === "function") { | |
| console.log(warn("\r\nCreating indexes! This can take a while...")); | |
| index = true; | |
| o.parser.proccesor.createIndexies(function () { | |
| index = false; | |
| }); | |
| } | |
| const iv = setInterval(function () { | |
| if (index === false) { | |
| clearInterval(iv); | |
| const milliseconds = new Date().getTime() - o.parser.time.start; | |
| const timePassed = parseInt(millisToMin(milliseconds)); | |
| console.log(warn("\r\nDocuments : ") + notice(total)); | |
| console.log(warn("Differenc : ") + notice(o.diff)); | |
| console.log(warn("----Lines : ") + notice(o.parser.counter.lines)); | |
| console.log(warn("-----Time : ") + error(timePassed)); | |
| return callback(); | |
| } | |
| }, 500); | |
| }); | |
| } | |
| }, 1500); | |
| }); | |
| readable.on("error", (err) => { | |
| console.error("An error occurred while processing the JSON file: " + parser.file); | |
| console.log(err); | |
| console.log(this.o.insertedDocuments); | |
| }); | |
| }; | |
| /** | |
| * Helper function to display streaming stats in the console. | |
| * Called periodically or on certain events (e.g., back-pressure). | |
| */ | |
| const stats = function (o) { | |
| o.currentMB = (o.totalBytes / 1000000).toFixed(2); | |
| const remaningMB = parseInt(o.stats["size"] / 1000000 - o.currentMB).toFixed(2); | |
| const currentTime = new Date().getTime(); | |
| const milliseconds = parseInt(currentTime - o.parser.time.start); | |
| const seconds = parseInt((milliseconds % 60000) / 1000).toFixed(0); | |
| const passed = parseInt(millisToMin(milliseconds)); | |
| const eta = Math.round((remaningMB / o.currentMB) * passed); | |
| const inserts = Math.round(o.insertedDocuments / o.perInsert); | |
| process.stdout.clearLine(); | |
| process.stdout.cursorTo(0); | |
| process.stdout.write( | |
| warn("Type: ") + error(o.type) + | |
| warn(" Time Passed (min): ") + notice(passed) + | |
| " " + warn(" (sec): ") + error(seconds) + | |
| warn(" Line: ") + notice(o.parser.counter.lines) + | |
| warn(" Inserted: ") + notice(o.insertedDocuments) + | |
| warn(" Current (MB): ") + notice(o.currentMB) + | |
| " " + warn(" Remaining (MB): ") + notice(remaningMB) + " MB" + | |
| warn(" ETA (min): ") + notice(eta) + " " + | |
| warn(" Diff (process/inserted): ") + error(o.diff) + | |
| " " + warn(" Per/Insert: ") + error(o.perInsert) + | |
| " " + warn(" Inserts (total): ") + error(inserts) + "\r" | |
| ); | |
| }; | |
| /** | |
| * Converts milliseconds to integer minutes. | |
| */ | |
| const millisToMin = function (millis) { | |
| return parseInt(Math.floor(millis / 60000)); | |
| }; | |
| /** | |
| * (Unused example) A helper function to track state if you want | |
| * to parse JSON lines manually. Not critical to main usage. | |
| */ | |
| function updateJSONState(state, line) { | |
| if (state === null && line.trim().startsWith("{")) { | |
| return "start"; | |
| } else if (state === "start" && line.trim().endsWith("}")) { | |
| return "end"; | |
| } | |
| return state; | |
| } | |
| module.exports = { | |
| xml: xml, | |
| text: text, | |
| json: json, | |
| mils: millisToMin, | |
| }; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment