Created
October 7, 2020 09:41
-
-
Save mrcalvin/aaf8af0c208b1da5204d525b5763871c to your computer and use it in GitHub Desktop.
::par::process
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package require Thread | |
| namespace eval ::par { | |
| proc schedule {tp fh chunksize} { | |
| variable jobs | |
| variable done | |
| lassign [chan pipe] pr pw | |
| thread::detach $pw | |
| set handler [list {pr job fh tp chunksize} { | |
| variable jobs | |
| variable done | |
| set result [chan read $pr] | |
| ### 1: EOF signals that a job has finished reading a | |
| ### chunk, and released the file channel | |
| if {[chan eof $pr]} { | |
| chan close $pr | |
| thread::attach $fh | |
| ### 2: Has the file been completely consumed? | |
| if {![chan eof $fh]} { | |
| ## 2a: NO, schedule the next job. | |
| thread::detach $fh | |
| lappend jobs [schedule $tp $fh $chunksize] | |
| } else { | |
| ## 2b: YES, signal the overall job schedule is complete. | |
| set done 1 | |
| } | |
| } else { | |
| # puts $result | |
| } | |
| } [namespace current]] | |
| chan configure $pr -blocking 0 | |
| set job [tpool::post -nowait $tp [list apply {{fh pw chunksize} { | |
| ### 1: Read the chunk | |
| thread::attach $fh | |
| thread::attach $pw | |
| set data [chan read $fh $chunksize] | |
| append data [chan gets $fh] | |
| # set offset [chan tell $fh] | |
| thread::detach $fh | |
| ### 2: Release the file channel to a follower job, to be scheduled next | |
| chan close $pw | |
| ### 3: Process the chunk, the more time is spent here, the more any par pays off. | |
| llength [split $data \n] | |
| }} $fh $pw $chunksize]] | |
| chan event $pr readable [list apply $handler $pr $job $fh $tp $chunksize] | |
| return $job | |
| } | |
| proc process {file workers {chunks ""}} { | |
| variable jobs | |
| variable done | |
| if {$chunks eq ""} { | |
| set chunks $workers | |
| } | |
| set tp [tpool::create -minworkers $workers -maxworkers $workers] | |
| set fsize [file size $file] | |
| set chunksize [expr {int(ceil($fsize/$chunks))}] | |
| set fh [open $file r] | |
| thread::detach $fh | |
| lappend jobs [schedule $tp $fh $chunksize] | |
| ### 1: Join point: Have all jobs been scheduled? | |
| vwait [namespace current]::done | |
| set results [list] | |
| ### 2: Join point: Have all jobs been completed? | |
| while {[llength $jobs]} { | |
| set ready [tpool::wait $tp $jobs jobs] | |
| foreach r $ready { | |
| lappend results [tpool::get $tp $r] | |
| } | |
| } | |
| catch {chan close $fh} | |
| unset -nocomplain jobs | |
| unset -nocomplain done | |
| return [tcl::mathop::+ {*}$results] | |
| } | |
| } | |
| ::par::process "data.txt" 12 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment