Created
August 25, 2024 07:54
-
-
Save ajeetkumarrauniyar/b9358b1aafdd5d2c2c17846b0e826de0 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| ``` | |
| node-assignment/ | |
| app.js | |
| cluster.js | |
| queue.js | |
| task.js | |
| logger.js | |
| package.json | |
| redis.conf | |
| log.txt | |
| README.md | |
| ``` | |
| ### app.js: | |
| ``` | |
| const express = require('express'); | |
| const app = express(); | |
| const cluster = require('./cluster'); | |
| const queue = require('./queue'); | |
| const task = require('./task'); // Import the task function | |
| app.use(express.json()); | |
| app.post('/api/v1/task', async (req, res) => { | |
| try { | |
| const userId = req.body.user_id; | |
| await queue.addTask(userId, async () => { | |
| try { | |
| await task(userId); | |
| res.status(200).send(`Task completed for user ${userId}`); | |
| } catch (error) { | |
| console.error(error); | |
| res.status(500).send(`Error completing task for user ${userId}`); | |
| } | |
| }); | |
| } catch (error) { | |
| console.error(error); | |
| res.status(500).send(`Error adding task to queue for user ${userId}`); | |
| } | |
| }); | |
| cluster.setupCluster(app, 2); | |
| ``` | |
| ### cluster.js: | |
| ``` | |
| const cluster = require('cluster'); | |
| const numCPUs = require('os').cpus().length; | |
| module.exports.setupCluster = (app, numWorkers, port = 3000) => { | |
| if (numWorkers > numCPUs) { | |
| console.warn(`Number of workers (${numWorkers}) is greater than number of CPUs (${numCPUs}).`); | |
| } | |
| if (cluster.isMaster) { | |
| console.log(`Master ${process.pid} is running`); | |
| // Fork workers. | |
| for (let i = 0; i < numWorkers; i++) { | |
| cluster.fork(); | |
| } | |
| cluster.on('exit', (worker, code, signal) => { | |
| console.log(`worker ${worker.process.pid} died`); | |
| cluster.fork(); // Restart the worker | |
| }); | |
| } else { | |
| try { | |
| app.listen(port, () => { | |
| console.log(`Worker ${process.pid} started on port ${port}`); | |
| }); | |
| } catch (error) { | |
| console.error(`Error starting worker ${process.pid}: ${error}`); | |
| process.exit(1); | |
| } | |
| } | |
| }; | |
| ``` | |
| ### queue.js: | |
| ``` | |
| const redis = require('redis'); | |
| const client = redis.createClient({ host: 'localhost', port: 6379 }); | |
| const queue = {}; | |
| module.exports.addTask = async (userId, taskFn) => { | |
| try { | |
| const key = `task:queue:${userId}`; | |
| const rateLimitKey = `task:rateLimit:${userId}`; | |
| // Check rate limit | |
| const rateLimit = await client.get(rateLimitKey); | |
| if (rateLimit && rateLimit >= 20) { | |
| // Queue task if rate limit exceeded | |
| await client.rpush(key, JSON.stringify({ taskFn })); | |
| return; | |
| } | |
| // Process task immediately if rate limit not exceeded | |
| await taskFn(); | |
| // Update rate limit | |
| await client.incr(rateLimitKey); | |
| await client.expire(rateLimitKey, 60); // expire in 1 minute | |
| } catch (error) { | |
| console.error(`Error adding task to queue: ${error}`); | |
| } | |
| }; | |
| module.exports.processQueue = async () => { | |
| try { | |
| const queues = await client.keys('task:queue:*'); | |
| await Promise.all(queues.map(async (queueKey) => { | |
| const tasks = await client.lrange(queueKey, 0, -1); | |
| await Promise.all(tasks.map(async (task) => { | |
| const { taskFn } = JSON.parse(task); | |
| await taskFn(); | |
| await client.lrem(queueKey, 1, task); | |
| })); | |
| })); | |
| } catch (error) { | |
| console.error(`Error processing queue: ${error}`); | |
| } | |
| }; | |
| ``` | |
| ### task.js: | |
| ``` | |
| const logger = require('./logger'); | |
| async function completeTask(userId) { | |
| try { | |
| logger.info(`${userId}-task completed at-${Date.now()}`); | |
| await logger.logTaskCompletion(userId); | |
| } catch (error) { | |
| logger.error(`Error completing task for user ${userId}: ${error}`); | |
| } | |
| } | |
| module.exports = completeTask; | |
| ``` | |
| ### logger.js: | |
| ``` | |
| const winston = require('winston'); | |
| const logger = winston.createLogger({ | |
| level: 'info', | |
| format: winston.format.json(), | |
| transports: [ | |
| new winston.transports.File({ filename: 'log.txt' }), | |
| ], | |
| }); | |
| async function logTaskCompletion(userId) { | |
| logger.info(`${userId}-task completed at-${Date.now()}`); | |
| } | |
| module.exports.logTaskCompletion = logTaskCompletion; | |
| ``` | |
| ### package.json: | |
| ``` | |
| { | |
| "name": "node-assignment", | |
| "version": "1.0.0", | |
| "dependencies": { | |
| "express": "^4.17.1", | |
| "redis": "^3.1.2", | |
| "cluster": "^0.7.7" | |
| }, | |
| "scripts": { | |
| "start": "node app.js" | |
| } | |
| } | |
| ``` | |
| ### redis.conf: | |
| ``` | |
| port 6379 | |
| ``` | |
| ### log.txt: | |
| ``` | |
| // empty file for logging task completions | |
| ``` |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment