-
-
Save rkh/2037092 to your computer and use it in GitHub Desktop.
| require 'thread' | |
| class Worker | |
| def initialize(count = 1) | |
| @queue, @closing, @threads, @mutex = Queue.new, false, [], Mutex.new | |
| add_worker(count) | |
| end | |
| def add_worker(count = 1) | |
| @mutex.synchronize do | |
| @threads += count.times.map { Thread.new { @queue.pop.call until @closing } } | |
| end | |
| end | |
| def run(block = Proc.new) | |
| @queue << block | |
| end | |
| def close(&block) | |
| run do | |
| @closing = true | |
| yield if block_given? | |
| wakeup | |
| end | |
| end | |
| def join | |
| @threads.each(&:join) | |
| end | |
| private | |
| def wakeup | |
| run { wakeup if @queue.num_waiting > 0 } | |
| end | |
| end |
I just removed wait. Dunno what it did there.
What are you trying to accomplish with run { @closing = true }?
Couldn't you replace run { @closing = true } with @closing = true since the running threads will see it automagically?
def wait
cv, mutex = ConditionalVariable.new, Mutex.new
mutex.synchronize do
run { mutex.synchronize { cv.signal } }
cv.wait(mutex)
end
endActually nevermind, I see what it's necessary. :-)
That run { @closing = true } is to make sure that the block passed to close will run.
Note that wait really only makes sense with worker size 1.
I like to have one Worker for reading from an IO and one for writing to it, one for logging to stdout and one for doing all the work. I also like to set the thread count to 0 and increase it later, which turns it into something like a deferrable.
I guess if you really wanted your threads to exit gracefully (honor the until @closing part), you'd need to send that for all of your threads, not just once:
count.times { run { @closing = true } }
Why? I clear the queue anyways. I mean, there still is a race condition in there (i.e. someone queues after I cleared it but before I queue the close). After the first thread called the block, @closing will be true, no matter how often it is queued.
Right, but let's say you have 10 threads, and there is no work to be done but you want to close/exit gracefully. You'll only schedule one @closing = true, which means only one of your threads will return from the blocking @queue.pop method and realize @closing = true. Your other threads will continue blocking on that call waiting for more work. I think it only really matters if you have some cleanup (e.g., cleaning up file handles, etc) that you want each of your worker threads to do before the program exits.
right
Exactly what are you using this for?
@ryanlecompte check out the dummy addition I made.
@judofyr just nonsens scripts atm
@judofyr just nonsens scripts atm
Wouldn't it be better to do something like this to make sure the block is called?
def close
run do
@closing = true
yield if block_given?
end
endright
If you have more than one worker, only one of them will shutdown, as the @closing will only be set once. You probably need to remember your worker threads and schedule close on each of them directly (or at least schedule close worker_threads.count times.
No, @closing is local to the worker instance which can launch more than one thread, but it will be true in all threads.
Meh, of course... But then I wouldn't call @queue.clear to have something like a soft-stop and to not lose already scheduled jobs.
But that rather depends on your use case.
I have an old piece of code that looks awfully similar to this, but uses throw/catch instead of the @closing thing: http://burgestrand.se/code/ruby-thread-pool/thread-pool.rb (I partly did it as an experiment with rocco)
It does not support adding more workers, however.
You could probably avoid the busy-sleep in #wait by storing the Thread.current, call
sleepand rather callthread.wakeupwhen you pop.