Parent

Class/Module Index [+]

Quicksearch

Ruote::Worker

Workers fetch 'msgs' and 'schedules' from the storage and process them.

Read more at ruote.rubyforge.org/configuration.html

Constants

DISP_ACTIONS
EXP_ACTIONS
PROC_ACTIONS

'apply' is comprised in 'launch' 'receive' is a ParticipantExpression alias for 'reply'

Attributes

context[R]
run_thread[R]
running[R]
storage[R]

Public Class Methods

new(storage) click to toggle source
# File lib/ruote/worker.rb, line 50
def initialize (storage)

  @subscribers = []
    # must be ready before the storage is created
    # services like Logger to subscribe to the worker

  @storage = storage
  @context = Ruote::Context.new(storage, self)

  @last_time = Time.at(0.0).utc # 1970...

  @running = true
  @run_thread = nil

  @msgs = []
  @sleep_time = 0.000
end

Public Instance Methods

inactive?() click to toggle source

Returns true if the engine system is inactive, ie if all the process instances are terminated or are stuck in an error.

NOTE : for now, if a branch of a process is in error while another is still running, this method will consider the process instance inactive (and it will return true if all the processes are considered inactive).

# File lib/ruote/worker.rb, line 120
def inactive?

  # the cheaper tests first

  return false if @msgs.size > 0
  return false unless @context.storage.empty?('schedules')
  return false unless @context.storage.empty?('msgs')

  wfids = @context.storage.get_many('expressions').collect { |exp|
    exp['fei']['wfid']
  }

  error_wfids = @context.storage.get_many('errors').collect { |err|
    err['fei']['wfid']
  }

  (wfids - error_wfids == [])
end
join() click to toggle source

Joins the run thread of this worker (if there is no such thread, this method will return immediately, without any effect).

# File lib/ruote/worker.rb, line 91
def join

  @run_thread.join if @run_thread
end
run() click to toggle source

Runs the worker in the current thread. See run_in_thread for running in a dedicated thread.

# File lib/ruote/worker.rb, line 71
def run

  step while @running
end
run_in_thread() click to toggle source

Triggers the run method of the worker in a dedicated thread.

# File lib/ruote/worker.rb, line 78
def run_in_thread

  Thread.abort_on_exception = true
    # TODO : remove me at some point

  @running = true

  @run_thread = Thread.new { run }
end
shutdown() click to toggle source
# File lib/ruote/worker.rb, line 101
def shutdown

  @running = false

  return unless @run_thread

  begin
    @run_thread.join
  rescue Exception => e
  end
end
subscribe(actions, subscriber) click to toggle source
# File lib/ruote/worker.rb, line 96
def subscribe (actions, subscriber)

  @subscribers << [ actions, subscriber ]
end

Protected Instance Methods

cancel_process(msg) click to toggle source
# File lib/ruote/worker.rb, line 313
def cancel_process (msg)

  root = @storage.find_root_expression(msg['wfid'])

  return unless root

  flavour = (msg['action'] == 'kill_process') ? 'kill' : nil

  @storage.put_msg(
    'cancel',
    'fei' => root['fei'],
    'wfid' => msg['wfid'], # indicates this was triggered by cancel_process
    'flavour' => flavour)
end
Also aliased as: kill_process
cannot_handle(msg) click to toggle source

Should always return false. Except when the message is a 'dispatch' and it's for a participant only available to an 'engine_worker' (block participants, stateful participants)

# File lib/ruote/worker.rb, line 263
def cannot_handle (msg)

  return false if msg['action'] != 'dispatch'

  @context.engine.nil? && msg['for_engine_worker?']
end
kill_process(msg) click to toggle source
Alias for: cancel_process
launch(msg) click to toggle source

Works for both the 'launch' and the 'apply' msgs.

# File lib/ruote/worker.rb, line 272
def launch (msg)

  tree = msg['tree']
  variables = msg['variables']

  exp_class = @context.expmap.expression_class(tree.first)

  # msg['wfid'] only : it's a launch
  # msg['fei'] : it's a sub launch (a supplant ?)

  exp_hash = {
    'fei' => msg['fei'] || {
      'engine_id' => @context.engine_id,
      'wfid' => msg['wfid'],
      'sub_wfid' => msg['sub_wfid'],
      'expid' => '0' },
    'parent_id' => msg['parent_id'],
    'original_tree' => tree,
    'variables' => variables,
    'applied_workitem' => msg['workitem'],
    'forgotten' => msg['forgotten']
  }

  if not exp_class

    #exp_class, tree, part = lookup_subprocess_or_participant(exp_hash)
    #exp_hash['participant'] = part if part
    exp_class = Ruote::Exp::RefExpression

  elsif msg['action'] == 'launch' && exp_class == Ruote::Exp::DefineExpression
    def_name, tree = Ruote::Exp::DefineExpression.reorganize(tree)
    variables[def_name] = [ '0', tree ] if def_name
    exp_class = Ruote::Exp::SequenceExpression
  end

  exp = exp_class.new(@context, exp_hash.merge!('original_tree' => tree))

  exp.initial_persist
  exp.do_apply
end
notify(msg) click to toggle source
# File lib/ruote/worker.rb, line 249
def notify (msg)

  @subscribers.each do |actions, subscriber|

    if actions == :all || actions.include?(msg['action'])
      subscriber.notify(msg)
    end
  end
end
process(msg) click to toggle source
# File lib/ruote/worker.rb, line 207
def process (msg)

  return false if cannot_handle(msg)

  return false unless @storage.reserve(msg)

  begin

    action = msg['action']

    if msg['tree']
      #
      # warning here, it could be a reply, with a 'tree' key...

      launch(msg)

    elsif EXP_ACTIONS.include?(action)

      Ruote::Exp::FlowExpression.do_action(@context, msg)

    elsif DISP_ACTIONS.include?(action)

      @context.dispatch_pool.handle(msg)

    elsif PROC_ACTIONS.include?(action)

      self.send(action, msg)

    #else
      # msg got deleted, might still be interesting for a subscriber
    end

    notify(msg)

  rescue Exception => exception

    @context.error_handler.msg_handle(msg, exception)
  end

  true
end
step() click to toggle source
# File lib/ruote/worker.rb, line 141
def step

  now = Time.now.utc
  delta = now - @last_time

  if delta >= 0.8
    #
    # at most once per second, deal with 'ats' and 'crons'

    @last_time = now

    @storage.get_schedules(delta, now).each do |sche|
      trigger(sche)
    end
  end

  # msgs

  @msgs = @storage.get_msgs if @msgs.empty?

  processed = 0
  collisions = 0

  while msg = @msgs.shift

    r = process(msg)

    if r != false
      processed += 1
    else
      collisions += 1
    end

    if collisions > 2
      @msgs = @msgs[(@msgs.size / 2)..-1] || []
    end

    #@msgs.concat(@storage.get_local_msgs)

    #print r == false ? '*' : '.'

    break if Time.now.utc - @last_time >= 0.8
  end

  #p processed

  if processed == 0
    @sleep_time += 0.001
    @sleep_time = 0.499 if @sleep_time > 0.499
    sleep(@sleep_time)
  else
    @sleep_time = 0.000
  end
end
trigger(schedule) click to toggle source
# File lib/ruote/worker.rb, line 196
def trigger (schedule)

  msg = Ruote.fulldup(schedule['msg'])

  return false unless @storage.reserve(schedule)

  @storage.put_msg(msg.delete('action'), msg)

  true
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.