Skip to content

Instantly share code, notes, and snippets.

@adamharder
Created May 31, 2019 15:41
Show Gist options
  • Select an option

  • Save adamharder/da196f38c3366d02f2e1e18fd79c2244 to your computer and use it in GitHub Desktop.

Select an option

Save adamharder/da196f38c3366d02f2e1e18fd79c2244 to your computer and use it in GitHub Desktop.
Multiprocessing queue backed by Redis
from multiprocessing import Process
import redis
_REDIS_KEY = "MY_GREAT_QUEUE"
_REDIS = redis.Redis()
_PROCESS_COUNT = 15
# clear the queue (may not be necessary)
_REDIS.expire(_REDIS_KEY, 0)
def add_item_to_proess(some_item:str):
_REDIS.sadd(_REDIS_KEY, some_item)
def get_next_item()->str:
next_item=_REDIS.spop(_REDIS_KEY)
if next_item is not None:
next_item=next_item.decode()
if next_item=="":
next_item=None
print(next_item)
return next_item
def item_processor():
new_item=get_next_item()
print(new_item)
while new_item is not None:
# do work with new_item
print(f"DOING SOME WORK WITH {new_item}")
new_item=get_next_item()
# load up the queue
for i in range(100):
add_item_to_proess(f"thing_{i}")
procs=[]
for p in range(_PROCESS_COUNT):
procs.append(Process(target=item_processor, args=()))
# start the processes
for p in procs:
p.start()
# wait for the processes
for p in procs:
p.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment