Created
June 22, 2016 09:36
-
-
Save simopal6/df699f3aa1b7e01c5b0be663de9c8551 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| -- Initialize thread pool: will be global to avoid serialization issues (https://github.com/torch/threads/issues/43) | |
| threads = require "threads" | |
| threads.Threads.serialization('threads.sharedserialize') | |
| pool = threads.Threads(5, | |
| function() | |
| threads_t = require "threads" | |
| require "sys" | |
| end | |
| ) | |
| -- Condition variables and mutexes for flag synchronization | |
| ready_flags = {} | |
| ready_mutexes = {} | |
| ready_conds = {} | |
| -- Initialize flags and condition variables for 3 tasks | |
| for i=1,3 do | |
| ready_flags[i] = torch.Tensor(1):fill(0) | |
| ready_mutexes[i] = threads.Mutex() | |
| ready_conds[i] = threads.Condition() | |
| end | |
| -- Start task | |
| function start_task(task) | |
| -- Prepare upvalues | |
| local task = task | |
| local ready_mutex_id = ready_mutexes[task]:id() | |
| local ready_cond_id = ready_conds[task]:id() | |
| local ready_flag = ready_flags[task] | |
| -- Add job | |
| print("M" .. task .. ": adding job") | |
| pool:addjob( | |
| function() | |
| print("T" .. task .. ": started") | |
| -- Work | |
| local work_time = torch.random(1,2) | |
| sys.sleep(work_time) | |
| -- Signal condition | |
| local ready_mutex = threads_t.Mutex(ready_mutex_id) | |
| local ready_cond = threads_t.Condition(ready_cond_id) | |
| print("T" .. task .. ": locking ready mutex") | |
| ready_mutex:lock() | |
| print("T" .. task .. ": locked") | |
| ready_flag:fill(1) | |
| print("T" .. task .. ": signalling") | |
| ready_cond:signal() | |
| print("T" .. task .. ": unlocking ready mutex") | |
| ready_mutex:unlock() | |
| print("T" .. task .. ": unlocked") | |
| print("T" .. task .. ": ended") | |
| -- Free mutexes | |
| ready_mutex:free() | |
| ready_cond:free() | |
| end | |
| ) | |
| end | |
| -- Initially start all tasks | |
| start_task(1) | |
| start_task(2) | |
| start_task(3) | |
| -- Start main loop | |
| while true do | |
| -- Get random task | |
| local task = torch.random(1, 3) | |
| -- Get mutex, condition and flag | |
| local ready_mutex = ready_mutexes[task] | |
| local ready_cond = ready_conds[task] | |
| local ready_flag = ready_flags[task] | |
| -- Check task completed | |
| -- If this is uncommented, everything works, but I have to wait for all threads to be completed, not just the one I'm interested in | |
| --pool:synchronize() | |
| print("M" .. task .. ": acquiring ready mutex") | |
| ready_mutex:lock() | |
| print("M" .. task .. ": checking flag (" .. ready_flag[1] .. ")") | |
| while ready_flag[1] == 0 do | |
| -- Wait | |
| print("M" .. task .. ": waiting") | |
| ready_cond:wait(ready_mutex) | |
| print("Woken up, checking flag (" .. ready_flag[1] .. ")") | |
| end | |
| print("M" .. task .. ": ready") | |
| -- Clear flag | |
| ready_flag:fill(0) | |
| -- Restart task preparation | |
| print("M" .. task .. ": restarting task") | |
| start_task(task) | |
| -- Release ready lock | |
| print("M" .. task .. ": unlocking ready mutex") | |
| ready_mutex:unlock() | |
| -- Check pending jobs | |
| -- If this block is uncommented, everything seems to work, however I think there's a race condition between the "if" check and the dojob() call | |
| -- (hasjob() may return true, then the job starts, than dojob() blocks everything) | |
| --if pool:hasjob() then | |
| -- print("M" .. task .. ": calling dojob()") | |
| -- pool:dojob() | |
| -- print("M" .. task .. ": called") | |
| --end | |
| end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment