Skip to content

Instantly share code, notes, and snippets.

@simopal6
Created June 22, 2016 09:36
Show Gist options
  • Select an option

  • Save simopal6/df699f3aa1b7e01c5b0be663de9c8551 to your computer and use it in GitHub Desktop.

Select an option

Save simopal6/df699f3aa1b7e01c5b0be663de9c8551 to your computer and use it in GitHub Desktop.
-- Initialize thread pool: will be global to avoid serialization issues (https://github.com/torch/threads/issues/43)
threads = require "threads"
threads.Threads.serialization('threads.sharedserialize')
pool = threads.Threads(5,
function()
threads_t = require "threads"
require "sys"
end
)
-- Condition variables and mutexes for flag synchronization
ready_flags = {}
ready_mutexes = {}
ready_conds = {}
-- Initialize flags and condition variables for 3 tasks
for i=1,3 do
ready_flags[i] = torch.Tensor(1):fill(0)
ready_mutexes[i] = threads.Mutex()
ready_conds[i] = threads.Condition()
end
-- Start task
function start_task(task)
-- Prepare upvalues
local task = task
local ready_mutex_id = ready_mutexes[task]:id()
local ready_cond_id = ready_conds[task]:id()
local ready_flag = ready_flags[task]
-- Add job
print("M" .. task .. ": adding job")
pool:addjob(
function()
print("T" .. task .. ": started")
-- Work
local work_time = torch.random(1,2)
sys.sleep(work_time)
-- Signal condition
local ready_mutex = threads_t.Mutex(ready_mutex_id)
local ready_cond = threads_t.Condition(ready_cond_id)
print("T" .. task .. ": locking ready mutex")
ready_mutex:lock()
print("T" .. task .. ": locked")
ready_flag:fill(1)
print("T" .. task .. ": signalling")
ready_cond:signal()
print("T" .. task .. ": unlocking ready mutex")
ready_mutex:unlock()
print("T" .. task .. ": unlocked")
print("T" .. task .. ": ended")
-- Free mutexes
ready_mutex:free()
ready_cond:free()
end
)
end
-- Initially start all tasks
start_task(1)
start_task(2)
start_task(3)
-- Start main loop
while true do
-- Get random task
local task = torch.random(1, 3)
-- Get mutex, condition and flag
local ready_mutex = ready_mutexes[task]
local ready_cond = ready_conds[task]
local ready_flag = ready_flags[task]
-- Check task completed
-- If this is uncommented, everything works, but I have to wait for all threads to be completed, not just the one I'm interested in
--pool:synchronize()
print("M" .. task .. ": acquiring ready mutex")
ready_mutex:lock()
print("M" .. task .. ": checking flag (" .. ready_flag[1] .. ")")
while ready_flag[1] == 0 do
-- Wait
print("M" .. task .. ": waiting")
ready_cond:wait(ready_mutex)
print("Woken up, checking flag (" .. ready_flag[1] .. ")")
end
print("M" .. task .. ": ready")
-- Clear flag
ready_flag:fill(0)
-- Restart task preparation
print("M" .. task .. ": restarting task")
start_task(task)
-- Release ready lock
print("M" .. task .. ": unlocking ready mutex")
ready_mutex:unlock()
-- Check pending jobs
-- If this block is uncommented, everything seems to work, however I think there's a race condition between the "if" check and the dojob() call
-- (hasjob() may return true, then the job starts, than dojob() blocks everything)
--if pool:hasjob() then
-- print("M" .. task .. ": calling dojob()")
-- pool:dojob()
-- print("M" .. task .. ": called")
--end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment