class Proxy::Dynflow::ProcessManager

An abstraction for managing local processes.

It can be used to:

@example Run date command and collect its output

pm = ProcessManager.new('date')
pm.run!
pm.status #=> 0
pm.stdout.to_s.chomp #=> "Thu Feb  3 04:27:42 PM CET 2022"

@example Run a shell loop, outputting all the lines it generates

pm = ProcessManager.new(['/bin/sh', '-c', 'for i in 1 2 3; do echo $i; sleep 1; done'])
pm.on_stdout { |data| puts data; '' }
pm.run!
#=> 1
#=> 2
#=> 3

@example Run bc (calculator) interactively and count down from 10 to 0

pm = ProcessManager.new('bc')
pm.on_stdout do |data|
  if data.match?(/^\d+/)
    n = data.to_i
    if n.zero?
      pm.stdin.to_io.close
    else
      pm.stdin.add_data("#{n} - 1\n")
    end
  end
  data
end
pm.stdin.add_data("10\n")
pm.run!
pm.stdout.to_s.lines #=. ["10\n", "9\n", "8\n", "7\n", "6\n", "5\n", "4\n", "3\n", "2\n", "1\n", "0\n"]

@attr_reader [Proxy::Dynflow::IOBuffer] stdin IOBuffer buffering writes to child process' standard input @attr_reader [Proxy::Dynflow::IOBuffer] stdout IOBuffer buffering reads from child process' standard output @attr_reader [Proxy::Dynflow::IOBuffer] stderr IOBuffer buffering reads from child process' standard error @attr_reader [nil, Integer] pid Process id of the child process, nil if the process was not started yet, -1 if the process could not be started @attr_reader [nil, Integer] status Exit status of the child process. nil if the child process has not finished yet, 255 if the process could not be started

Attributes

pid[R]
status[R]
stderr[R]
stdin[R]
stdout[R]

Public Class Methods

new(command) click to toggle source

@param [String, [String], [Hash, String]] command A command to run in one of the forms accepted by Kernel.spawn

# File lib/smart_proxy_dynflow/process_manager.rb, line 55
def initialize(command)
  @command = command
  @stdin  = IOBuffer.new(nil)
  @stdout = IOBuffer.new(nil)
  @stderr = IOBuffer.new(nil)
end

Public Instance Methods

close() click to toggle source

Makes the process manager close all the pipes it may have opened to communicate with the child process

@return [void]

# File lib/smart_proxy_dynflow/process_manager.rb, line 159
def close
  [@stdin, @stdout, @stderr].each(&:close)
end
done?() click to toggle source

Determines whether the child process of the process manager already finished

@return [true, false] whether the child process of the process manager already finished

# File lib/smart_proxy_dynflow/process_manager.rb, line 104
def done?
  started? && !status.nil?
end
on_stderr(&block) click to toggle source

Sets block to be executed each time data is read from child process' standard error

@return [void]

# File lib/smart_proxy_dynflow/process_manager.rb, line 152
def on_stderr(&block)
  @stderr.on_data(&block)
end
on_stdout(&block) click to toggle source

Sets block to be executed each time data is read from child process' standard output

@return [void]

# File lib/smart_proxy_dynflow/process_manager.rb, line 145
def on_stdout(&block)
  @stdout.on_data(&block)
end
process(timeout: nil) click to toggle source

Runs a single iteration of the manager's processing loop. It waits until either:

  • data is available in pipes connected to the child process' standard output or error

  • there is pending data to be written and the pipe connected to the child process' standard input is writable

  • a timeout is reached

After the wait, all pending data is read and written.

If all the pipes connected to the child process are closed, it marks the execution as complete and performs cleanup.

@param timeout [nil, Numeric] controls how long this call should wait for data to become available. Waits indefinitely if nil. @return [void]

# File lib/smart_proxy_dynflow/process_manager.rb, line 120
def process(timeout: nil)
  raise 'Cannot process until the manager is started' unless started?

  writers = [@stdin].reject { |buf| buf.empty? || buf.closed? }
  readers = [@stdout, @stderr].reject(&:closed?)

  if readers.empty? && writers.empty?
    finish
    return
  end

  # Even though the FDs are still open, the child might have exited already
  pid, status = Process.waitpid2(@pid, Process::WNOHANG)
  timeout = 1 if pid

  ready_readers, ready_writers = IO.select(readers, writers, nil, timeout)
  (ready_readers || []).each(&:read_available!)
  (ready_writers || []).each(&:write_available!)

  finish(status) if pid
end
run!() click to toggle source

Starts the process manager and runs it until it finishes

@return [ProcessManager] the process manager itself to allow method chaining

# File lib/smart_proxy_dynflow/process_manager.rb, line 65
def run!
  start! unless started?
  process until done?
  self
end
start!() click to toggle source

Starts the child process. It creates 3 pipes for communicating with the child process and the forks it. The process manager is considered done if the child process cannot be started.

@return [void]

# File lib/smart_proxy_dynflow/process_manager.rb, line 76
def start!
  in_read,  in_write  = IO.pipe
  out_read, out_write = IO.pipe
  err_read, err_write = IO.pipe

  @stdin.io  = in_write
  @stdout.io = out_read
  @stderr.io = err_read

  @pid = spawn(*@command, :in => in_read, :out => out_write, :err => err_write)
  [in_read, out_write, err_write].each(&:close)
rescue Errno::ENOENT => e
  [in_read, in_write, out_read, out_write, err_read, err_write].each(&:close)
  @pid = -1
  @status = 255
  @stderr.add_data(e.message)
end
started?() click to toggle source

Determines whether the process manager already forked off its child process

@return [true, false] whether the process manager already forked off its child process

# File lib/smart_proxy_dynflow/process_manager.rb, line 97
def started?
  !pid.nil?
end

Private Instance Methods

finish(status = nil) click to toggle source

Makes the process manager finish its run, closing opened FDs and reaping the child process

@return [void]

# File lib/smart_proxy_dynflow/process_manager.rb, line 168
def finish(status = nil)
  close
  if status.nil? && @pid != -1 && !done?
    _pid, status = Process.wait2(@pid)
    @status = status.exitstatus
  elsif status
    @status = status.exitstatus
  end
end