module Parallel
Constants
- Stop
- VERSION
Public Class Methods
each(array, options={}, &block)
click to toggle source
# File lib/parallel.rb, line 202 def each(array, options={}, &block) map(array, options.merge(:preserve_results => false), &block) array end
each_with_index(array, options={}, &block)
click to toggle source
# File lib/parallel.rb, line 207 def each_with_index(array, options={}, &block) each(array, options.merge(:with_index => true), &block) end
in_processes(options = {}, &block)
click to toggle source
# File lib/parallel.rb, line 196 def in_processes(options = {}, &block) count, options = extract_count_from_options(options) count ||= processor_count map(0...count, options.merge(:in_processes => count), &block) end
in_threads(options={:count => 2}) { |i| ... }
click to toggle source
# File lib/parallel.rb, line 189 def in_threads(options={:count => 2}) count, _ = extract_count_from_options(options) Array.new(count).each_with_index.map do |_, i| Thread.new { yield(i) } end.map!(&:value) end
map(source, options = {}, &block)
click to toggle source
# File lib/parallel.rb, line 211 def map(source, options = {}, &block) options[:mutex] = Mutex.new if RUBY_PLATFORM =~ /java/ and not options[:in_processes] method = :in_threads size = options[method] || processor_count elsif options[:in_threads] method = :in_threads size = options[method] else method = :in_processes if Process.respond_to?(:fork) size = options[method] || processor_count else warn "Process.fork is not supported by this Ruby" size = 0 end end job_factory = JobFactory.new(source, options[:mutex]) size = [job_factory.size, size].min options[:return_results] = (options[:preserve_results] != false || !!options[:finish]) add_progress_bar!(job_factory, options) if size == 0 work_direct(job_factory, options, &block) elsif method == :in_threads work_in_threads(job_factory, options.merge(:count => size), &block) else work_in_processes(job_factory, options.merge(:count => size), &block) end end
map_with_index(array, options={}, &block)
click to toggle source
# File lib/parallel.rb, line 245 def map_with_index(array, options={}, &block) map(array, options.merge(:with_index => true), &block) end
Private Class Methods
add_progress_bar!(job_factory, options)
click to toggle source
# File lib/parallel.rb, line 251 def add_progress_bar!(job_factory, options) if progress_options = options[:progress] raise "Progressbar can only be used with array like items" if job_factory.size == Float::INFINITY require 'ruby-progressbar' if progress_options == true progress_options = { title: "Progress" } elsif progress_options.respond_to? :to_str progress_options = { title: progress_options.to_str } end progress_options = { total: job_factory.size, format: '%t |%E | %B | %a' }.merge(progress_options) progress = ProgressBar.create(progress_options) old_finish = options[:finish] options[:finish] = lambda do |item, i, result| old_finish.call(item, i, result) if old_finish progress.increment end end end
call_with_index(item, index, options, &block)
click to toggle source
# File lib/parallel.rb, line 433 def call_with_index(item, index, options, &block) args = [item] args << index if options[:with_index] if options[:return_results] block.call(*args) else block.call(*args) nil # avoid GC overhead of passing large results around end end
create_workers(job_factory, options, &block)
click to toggle source
# File lib/parallel.rb, line 371 def create_workers(job_factory, options, &block) workers = [] Array.new(options[:count]).each do workers << worker(job_factory, options.merge(:started_workers => workers), &block) end workers end
extract_count_from_options(options)
click to toggle source
options is either a Integer or a Hash with :count
# File lib/parallel.rb, line 423 def extract_count_from_options(options) if options.is_a?(Hash) count = options[:count] else count = options options = {} end [count, options] end
handle_exception(exception, results)
click to toggle source
# File lib/parallel.rb, line 416 def handle_exception(exception, results) return nil if [Parallel::Break, Parallel::Kill].include? exception.class raise exception if exception results end
process_incoming_jobs(read, write, job_factory, options, &block)
click to toggle source
# File lib/parallel.rb, line 403 def process_incoming_jobs(read, write, job_factory, options, &block) until read.eof? data = Marshal.load(read) item, index = job_factory.unpack(data) result = begin call_with_index(item, index, options, &block) rescue StandardError => e ExceptionWrapper.new(e) end Marshal.dump(result, write) end end
replace_worker(job_factory, workers, i, options, blk)
click to toggle source
# File lib/parallel.rb, line 361 def replace_worker(job_factory, workers, i, options, blk) # old worker is no longer used ... stop it worker = workers[i] worker.stop if worker # create a new replacement worker running = workers - [worker] workers[i] = worker(job_factory, options.merge(started_workers: running), &blk) end
with_instrumentation(item, index, options) { || ... }
click to toggle source
# File lib/parallel.rb, line 444 def with_instrumentation(item, index, options) on_start = options[:start] on_finish = options[:finish] options[:mutex].synchronize { on_start.call(item, index) } if on_start result = yield options[:mutex].synchronize { on_finish.call(item, index, result) } if on_finish result unless options[:preserve_results] == false end
work_direct(job_factory, options, &block)
click to toggle source
# File lib/parallel.rb, line 276 def work_direct(job_factory, options, &block) results = [] while set = job_factory.next item, index = set results << with_instrumentation(item, index, options) do call_with_index(item, index, options, &block) end end results end
work_in_processes(job_factory, options, &blk)
click to toggle source
# File lib/parallel.rb, line 311 def work_in_processes(job_factory, options, &blk) workers = if options[:isolation] [] # we create workers per job and not beforehand else create_workers(job_factory, options, &blk) end results = [] results_mutex = Mutex.new # arrays are not thread-safe exception = nil UserInterruptHandler.kill_on_ctrl_c(workers.map(&:pid), options) do in_threads(options) do |i| worker = workers[i] begin loop do break if exception item, index = job_factory.next break unless index if options[:isolation] worker = replace_worker(job_factory, workers, i, options, blk) end worker.thread = Thread.current begin result = with_instrumentation item, index, options do worker.work(job_factory.pack(item, index)) end results_mutex.synchronize { results[index] = result } # arrays are not threads safe on jRuby rescue StandardError => e exception = e if Parallel::Kill === exception (workers - [worker]).each do |w| w.thread.kill UserInterruptHandler.kill(w.pid) end end end end ensure worker.stop if worker end end end handle_exception(exception, results) end
work_in_threads(job_factory, options, &block)
click to toggle source
# File lib/parallel.rb, line 287 def work_in_threads(job_factory, options, &block) raise "interrupt_signal is no longer supported for threads" if options[:interrupt_signal] results = [] results_mutex = Mutex.new # arrays are not thread-safe on jRuby exception = nil in_threads(options) do # as long as there are more jobs, work on one of them while !exception && set = job_factory.next begin item, index = set result = with_instrumentation item, index, options do call_with_index(item, index, options, &block) end results_mutex.synchronize { results[index] = result } rescue StandardError => e exception = e end end end handle_exception(exception, results) end
worker(job_factory, options, &block)
click to toggle source
# File lib/parallel.rb, line 379 def worker(job_factory, options, &block) child_read, parent_write = IO.pipe parent_read, child_write = IO.pipe pid = Process.fork do begin options.delete(:started_workers).each(&:close_pipes) parent_write.close parent_read.close process_incoming_jobs(child_read, child_write, job_factory, options, &block) ensure child_read.close child_write.close end end child_read.close child_write.close Worker.new(parent_read, parent_write, pid) end