Skip to content

Instantly share code, notes, and snippets.

@mrcalvin
Created October 7, 2020 09:41
Show Gist options
  • Select an option

  • Save mrcalvin/aaf8af0c208b1da5204d525b5763871c to your computer and use it in GitHub Desktop.

Select an option

Save mrcalvin/aaf8af0c208b1da5204d525b5763871c to your computer and use it in GitHub Desktop.
::par::process
package require Thread
namespace eval ::par {
proc schedule {tp fh chunksize} {
variable jobs
variable done
lassign [chan pipe] pr pw
thread::detach $pw
set handler [list {pr job fh tp chunksize} {
variable jobs
variable done
set result [chan read $pr]
### 1: EOF signals that a job has finished reading a
### chunk, and released the file channel
if {[chan eof $pr]} {
chan close $pr
thread::attach $fh
### 2: Has the file been completely consumed?
if {![chan eof $fh]} {
## 2a: NO, schedule the next job.
thread::detach $fh
lappend jobs [schedule $tp $fh $chunksize]
} else {
## 2b: YES, signal the overall job schedule is complete.
set done 1
}
} else {
# puts $result
}
} [namespace current]]
chan configure $pr -blocking 0
set job [tpool::post -nowait $tp [list apply {{fh pw chunksize} {
### 1: Read the chunk
thread::attach $fh
thread::attach $pw
set data [chan read $fh $chunksize]
append data [chan gets $fh]
# set offset [chan tell $fh]
thread::detach $fh
### 2: Release the file channel to a follower job, to be scheduled next
chan close $pw
### 3: Process the chunk, the more time is spent here, the more any par pays off.
llength [split $data \n]
}} $fh $pw $chunksize]]
chan event $pr readable [list apply $handler $pr $job $fh $tp $chunksize]
return $job
}
proc process {file workers {chunks ""}} {
variable jobs
variable done
if {$chunks eq ""} {
set chunks $workers
}
set tp [tpool::create -minworkers $workers -maxworkers $workers]
set fsize [file size $file]
set chunksize [expr {int(ceil($fsize/$chunks))}]
set fh [open $file r]
thread::detach $fh
lappend jobs [schedule $tp $fh $chunksize]
### 1: Join point: Have all jobs been scheduled?
vwait [namespace current]::done
set results [list]
### 2: Join point: Have all jobs been completed?
while {[llength $jobs]} {
set ready [tpool::wait $tp $jobs jobs]
foreach r $ready {
lappend results [tpool::get $tp $r]
}
}
catch {chan close $fh}
unset -nocomplain jobs
unset -nocomplain done
return [tcl::mathop::+ {*}$results]
}
}
::par::process "data.txt" 12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment