Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save ajeetkumarrauniyar/b9358b1aafdd5d2c2c17846b0e826de0 to your computer and use it in GitHub Desktop.

Select an option

Save ajeetkumarrauniyar/b9358b1aafdd5d2c2c17846b0e826de0 to your computer and use it in GitHub Desktop.
```
node-assignment/
app.js
cluster.js
queue.js
task.js
logger.js
package.json
redis.conf
log.txt
README.md
```
### app.js:
```
const express = require('express');
const app = express();
const cluster = require('./cluster');
const queue = require('./queue');
const task = require('./task'); // Import the task function
app.use(express.json());
app.post('/api/v1/task', async (req, res) => {
try {
const userId = req.body.user_id;
await queue.addTask(userId, async () => {
try {
await task(userId);
res.status(200).send(`Task completed for user ${userId}`);
} catch (error) {
console.error(error);
res.status(500).send(`Error completing task for user ${userId}`);
}
});
} catch (error) {
console.error(error);
res.status(500).send(`Error adding task to queue for user ${userId}`);
}
});
cluster.setupCluster(app, 2);
```
### cluster.js:
```
const cluster = require('cluster');
const numCPUs = require('os').cpus().length;
module.exports.setupCluster = (app, numWorkers, port = 3000) => {
if (numWorkers > numCPUs) {
console.warn(`Number of workers (${numWorkers}) is greater than number of CPUs (${numCPUs}).`);
}
if (cluster.isMaster) {
console.log(`Master ${process.pid} is running`);
// Fork workers.
for (let i = 0; i < numWorkers; i++) {
cluster.fork();
}
cluster.on('exit', (worker, code, signal) => {
console.log(`worker ${worker.process.pid} died`);
cluster.fork(); // Restart the worker
});
} else {
try {
app.listen(port, () => {
console.log(`Worker ${process.pid} started on port ${port}`);
});
} catch (error) {
console.error(`Error starting worker ${process.pid}: ${error}`);
process.exit(1);
}
}
};
```
### queue.js:
```
const redis = require('redis');
const client = redis.createClient({ host: 'localhost', port: 6379 });
const queue = {};
module.exports.addTask = async (userId, taskFn) => {
try {
const key = `task:queue:${userId}`;
const rateLimitKey = `task:rateLimit:${userId}`;
// Check rate limit
const rateLimit = await client.get(rateLimitKey);
if (rateLimit && rateLimit >= 20) {
// Queue task if rate limit exceeded
await client.rpush(key, JSON.stringify({ taskFn }));
return;
}
// Process task immediately if rate limit not exceeded
await taskFn();
// Update rate limit
await client.incr(rateLimitKey);
await client.expire(rateLimitKey, 60); // expire in 1 minute
} catch (error) {
console.error(`Error adding task to queue: ${error}`);
}
};
module.exports.processQueue = async () => {
try {
const queues = await client.keys('task:queue:*');
await Promise.all(queues.map(async (queueKey) => {
const tasks = await client.lrange(queueKey, 0, -1);
await Promise.all(tasks.map(async (task) => {
const { taskFn } = JSON.parse(task);
await taskFn();
await client.lrem(queueKey, 1, task);
}));
}));
} catch (error) {
console.error(`Error processing queue: ${error}`);
}
};
```
### task.js:
```
const logger = require('./logger');
async function completeTask(userId) {
try {
logger.info(`${userId}-task completed at-${Date.now()}`);
await logger.logTaskCompletion(userId);
} catch (error) {
logger.error(`Error completing task for user ${userId}: ${error}`);
}
}
module.exports = completeTask;
```
### logger.js:
```
const winston = require('winston');
const logger = winston.createLogger({
level: 'info',
format: winston.format.json(),
transports: [
new winston.transports.File({ filename: 'log.txt' }),
],
});
async function logTaskCompletion(userId) {
logger.info(`${userId}-task completed at-${Date.now()}`);
}
module.exports.logTaskCompletion = logTaskCompletion;
```
### package.json:
```
{
"name": "node-assignment",
"version": "1.0.0",
"dependencies": {
"express": "^4.17.1",
"redis": "^3.1.2",
"cluster": "^0.7.7"
},
"scripts": {
"start": "node app.js"
}
}
```
### redis.conf:
```
port 6379
```
### log.txt:
```
// empty file for logging task completions
```
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment