Created
May 31, 2019 15:41
-
-
Save adamharder/da196f38c3366d02f2e1e18fd79c2244 to your computer and use it in GitHub Desktop.
Multiprocessing queue backed by Redis
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from multiprocessing import Process | |
| import redis | |
| _REDIS_KEY = "MY_GREAT_QUEUE" | |
| _REDIS = redis.Redis() | |
| _PROCESS_COUNT = 15 | |
| # clear the queue (may not be necessary) | |
| _REDIS.expire(_REDIS_KEY, 0) | |
| def add_item_to_proess(some_item:str): | |
| _REDIS.sadd(_REDIS_KEY, some_item) | |
| def get_next_item()->str: | |
| next_item=_REDIS.spop(_REDIS_KEY) | |
| if next_item is not None: | |
| next_item=next_item.decode() | |
| if next_item=="": | |
| next_item=None | |
| print(next_item) | |
| return next_item | |
| def item_processor(): | |
| new_item=get_next_item() | |
| print(new_item) | |
| while new_item is not None: | |
| # do work with new_item | |
| print(f"DOING SOME WORK WITH {new_item}") | |
| new_item=get_next_item() | |
| # load up the queue | |
| for i in range(100): | |
| add_item_to_proess(f"thing_{i}") | |
| procs=[] | |
| for p in range(_PROCESS_COUNT): | |
| procs.append(Process(target=item_processor, args=())) | |
| # start the processes | |
| for p in procs: | |
| p.start() | |
| # wait for the processes | |
| for p in procs: | |
| p.join() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment