Created
August 7, 2025 13:39
-
-
Save asimsolehria/9c11c081566c4c0bcb8de8239fe70956 to your computer and use it in GitHub Desktop.
A queuing mechanism for celery tasks based on priorities such as waiting time, user_tier
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import time | |
| from datetime import datetime | |
| from celery import Celery | |
| from collections import deque | |
| app = Celery('priority_queue', broker='redis://localhost:6379/0') #redis or rabbitmq (i personally use rabbitmq) | |
| # Queue: stores (task_id, task_data, enqueue_time) | |
| task_queue = deque() | |
| # Priority configuration | |
| WEIGHTS = { | |
| "waiting": 1.5, | |
| "processing": 1.0, | |
| } | |
| BASE_SCORE = { | |
| "paid": 100, | |
| "free": 50 | |
| } | |
| def calculate_priority(task): | |
| waiting_time = (datetime.now() - task['enqueue_time']).total_seconds() | |
| processing_time = task['processing_time'] | |
| user_tier = task['user_tier'] | |
| score = ( | |
| BASE_SCORE[user_tier] + | |
| WEIGHTS['waiting'] * waiting_time - | |
| WEIGHTS['processing'] * processing_time | |
| ) | |
| return score | |
| def get_next_task(): | |
| if not task_queue: | |
| return None | |
| # Sort by priority score descending | |
| sorted_tasks = sorted(task_queue, key=calculate_priority, reverse=True) | |
| best_task = sorted_tasks[0] | |
| # Remove from queue | |
| task_queue.remove(best_task) | |
| return best_task | |
| @app.task | |
| def process_tasks(): | |
| task = get_next_task() | |
| if not task: | |
| print("No tasks in queue") | |
| return | |
| print(f"Processing {task['task_id']} with user tier {task['user_tier']}") | |
| time.sleep(task['processing_time']) # Simulate work | |
| print(f"Done processing {task['task_id']}") | |
| def add_task(task_id, user_tier, processing_time): | |
| task = { | |
| "task_id": task_id, | |
| "user_tier": user_tier, # "free" or "paid" | |
| "processing_time": processing_time, | |
| "enqueue_time": datetime.now(), | |
| } | |
| task_queue.append(task) | |
| add_task("task_1", "free", 5) | |
| add_task("task_2", "paid", 3) | |
| add_task("task_3", "free", 2) | |
| # Example execution | |
| process_tasks.delay() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment