Parent

Included Modules

Class/Module Index [+]

Quicksearch

Ruote::Engine

This class holds the 'engine' name, perhaps 'dashboard' would have been a better name. Anyway, the methods here allow to launch processes and to query about their status. There are also methods for fixing issues with stalled processes or processes stuck in errors.

NOTE : the methods launch and reply are implemented in Ruote::ReceiverMixin

Attributes

context[R]
variables[R]

Public Class Methods

new(worker_or_storage, run=true) click to toggle source

Creates an engine using either worker or storage.

If a storage instance is given as the first argument, the engine will be able to manage processes (for example, launch and cancel workflows) but will not actually run any workflows.

If a worker instance is given as the first argument and the second argument is true, engine will start the worker and will be able to both manage and run workflows.

# File lib/ruote/engine.rb, line 58
def initialize (worker_or_storage, run=true)

  @context = worker_or_storage.context
  @context.engine = self

  @variables = EngineVariables.new(@context.storage)

  @context.worker.run_in_thread if @context.worker && run
    # launch the worker if there is one
end

Public Instance Methods

add_service(name, path_or_instance, classname=nil, opts=nil) click to toggle source

Adds a service locally (will not get propagated to other workers).

tracer = Tracer.new
@engine.add_service('tracer', tracer)

or

@engine.add_service('tracer', 'ruote/exp/tracer', 'Ruote::Exp::Tracer')

This method returns the service instance it just bound.

# File lib/ruote/engine.rb, line 490
def add_service (name, path_or_instance, classname=nil, opts=nil)

  @context.add_service(name, path_or_instance, classname, opts)
end
cancel_expression(fei) click to toggle source

Cancels a segment of process instance. Since expressions are nodes in processes instances, cancelling an expression, will cancel the expression and all its children (the segment of process).

# File lib/ruote/engine.rb, line 106
def cancel_expression (fei)

  fei = fei.to_h if fei.respond_to?(:to_h)
  @context.storage.put_msg('cancel', 'fei' => fei)
end
cancel_process(wfid) click to toggle source

Given a process identifier (wfid), cancels this process.

# File lib/ruote/engine.rb, line 88
def cancel_process (wfid)

  @context.storage.put_msg('cancel_process', 'wfid' => wfid)
end
configure(config_key, value) click to toggle source

Sets a configuration option. Examples:

# allow remote workflow definitions (for subprocesses or when launching
# processes)
@engine.configure('remote_definition_allowed', true)

# allow ruby_eval
@engine.configure('ruby_eval_allowed', true)
# File lib/ruote/engine.rb, line 504
def configure (config_key, value)

  @context[config_key] = value
end
errors(wfid=nil) click to toggle source

Returns an array of current errors (hashes)

# File lib/ruote/engine.rb, line 214
def errors (wfid=nil)

  wfid.nil? ?
    @context.storage.get_many('errors') :
    @context.storage.get_many('errors', /!#{wfid}$/)
end
join() click to toggle source

Joins the worker thread. If this engine has no nested worker, calling this method will simply return immediately.

# File lib/ruote/engine.rb, line 286
def join

  worker.join if worker
end
kill_expression(fei) click to toggle source

Like cancel_expression, but :on_cancel attributes (of the expressions) are not triggered.

# File lib/ruote/engine.rb, line 115
def kill_expression (fei)

  fei = fei.to_h if fei.respond_to?(:to_h)
  @context.storage.put_msg('cancel', 'fei' => fei, 'flavour' => 'kill')
end
kill_process(wfid) click to toggle source

Given a process identifier (wfid), kills this process. Killing is equivalent to cancelling, but when killing, :on_cancel attributes are not triggered.

# File lib/ruote/engine.rb, line 97
def kill_process (wfid)

  @context.storage.put_msg('kill_process', 'wfid' => wfid)
end
load_definition(path) click to toggle source

Loads and parses the process definition at the given path.

# File lib/ruote/engine.rb, line 293
def load_definition (path)

  @context.parser.parse(path)
end
noisy=(b) click to toggle source

A debug helper :

engine.noisy = true

will let the engine (in fact the worker) pour all the details of the executing process instances to STDOUT.

# File lib/ruote/engine.rb, line 537
def noisy= (b)

  @context.logger.noisy = b
end
participant_list() click to toggle source

Returns a list of Ruote::ParticipantEntry instances.

engine.register_participant :alpha, MyParticipant, 'message' => 'hello'

# interrogate participant list
#
list = engine.participant_list
participant = list.first
p participant.regex
  # => "^alpha$"
p participant.classname
  # => "MyParticipant"
p participant.options
  # => {"message"=>"hello"}

# update participant list
#
participant.regex = '^alfred$'
engine.participant_list = list
# File lib/ruote/engine.rb, line 452
def participant_list

  @context.plist.list
end
participant_list=(pl) click to toggle source

Accepts a list of Ruote::ParticipantEntry instances.

See Engine#participant_list

# File lib/ruote/engine.rb, line 461
def participant_list= (pl)

  @context.plist.list = pl
end
process(wfid) click to toggle source

Returns a ProcessStatus instance describing the current status of a process instance.

# File lib/ruote/engine.rb, line 173
def process (wfid)

  exps = @context.storage.get_many('expressions', /!#{wfid}$/)
  errs = self.errors( wfid )

  return nil if exps.empty? && errs.empty?

  ProcessStatus.new(@context, exps, errs)
end
process_wfids() click to toggle source

Returns a [sorted] list of wfids of the process instances currently running in the engine.

This operation is substantially less costly than Engine#processes (though the 'how substantially' depends on the storage chosen).

# File lib/ruote/engine.rb, line 227
def process_wfids

  @context.storage.ids('expressions').collect { |sfei|
    sfei.split('!').last
  }.uniq.sort
end
processes() click to toggle source

Returns an array of ProcessStatus instances.

WARNING : this is an expensive operation.

Please note, if you're interested only in processes that have errors, Engine#errors is a more efficient mean.

To simply list the wfids of the currently running, Engine#process_wfids is way cheaper to call.

# File lib/ruote/engine.rb, line 193
def processes

  exps = @context.storage.get_many('expressions')
  errs = self.errors

  by_wfid = {}

  exps.each do |exp|
    (by_wfid[exp['fei']['wfid']] ||= [ [], [] ]).first << exp
  end
  errs.each do |err|
    (by_wfid[err['msg']['fei']['wfid']] ||= [ [], [] ]).last << err
  end

  by_wfid.values.collect { |expressions, errors|
    ProcessStatus.new(@context, expressions, errors)
  }
end
re_apply(fei, opts={}) click to toggle source

Re-applies an expression (given via its FlowExpressionId).

That will cancel the expression and, once the cancel operation is over (all the children have been cancelled), the expression will get re-applied.

options

:tree is used to completely change the tree of the expression at re_apply

engine.re_apply(fei, :tree => [ 'participant', { 'ref' => 'bob' }, [] ])

:fields is used to replace the fields of the workitem at re_apply

engine.re_apply(fei, :fields => { 'customer' => 'bob' })

:merge_in_fields is used to add / override fields

engine.re_apply(fei, :merge_in_fields => { 'customer' => 'bob' })
# File lib/ruote/engine.rb, line 165
def re_apply (fei, opts={})

  @context.storage.put_msg('cancel', 'fei' => fei.to_h, 're_apply' => opts)
end
register(*args, &block) click to toggle source

A shorter version of register_participant

engine.register 'alice', MailParticipant, :target => 'alice@example.com'

or a block registering mechanism.

engine.register do
  alpha 'Participants::Alpha', 'flavour' => 'vanilla'
  participant 'bravo', 'Participants::Bravo', :flavour => 'peach'
  catchall ParticipantCharlie, 'flavour' => 'coconut'
end

Originally implemented in ruote-kit by Torsten Schoenebaum.

# File lib/ruote/engine.rb, line 407
def register (*args, &block)

  if args.size > 0
    register_participant(*args, &block)
  else
    proxy = ParticipantRegistrationProxy.new(self)
    block.arity < 1 ? proxy.instance_eval(&block) : block.call(proxy)
  end
end
register_participant(regex, participant=nil, opts={}, &block) click to toggle source

Registers a participant in the engine. Returns the participant instance.

Some examples :

require 'ruote/part/hash_participant'
alice = engine.register_participant 'alice', Ruote::HashParticipant
  # register an in-memory (hash) store for Alice's workitems

engine.register_participant 'compute_sum' do |wi|
  wi.fields['sum'] = wi.fields['articles'].inject(0) do |s, (c, v)|
    s + c * v # sum + count * value
  end
  # a block participant implicitely replies to the engine immediately
end

class MyParticipant
  def initialize (name)
    @name = name
  end
  def consume (workitem)
    workitem.fields['rocket_name'] = @name
    send_to_the_moon(workitem)
  end
  def cancel (fei, flavour)
    # do nothing
  end
end
engine.register_participant /^moon-.+/, MyParticipant.new('Saturn-V')

'stateless' participants are preferred over 'stateful' ones

Ruote 2.1 is OK with 1 storage and 1+ workers. The workers may be in other ruby runtimes. This implies that if you have registered a participant instance (instead of passing its classname and options), that participant will only run in the worker 'embedded' in the engine where it was registered... Let me rephrase it, participants instantiated at registration time (and that includes block participants) only runs in one worker, always the same.

'stateless' participants, instantiated at each dispatch, are preferred. Any worker can handle them.

Block participants are still fine for demos (where the worker is included in the engine (see all the quickstarts). And small engines with 1 worker are not that bad, not everybody is building huge systems).

Here is a 'stateless' participant example :

class MyStatelessParticipant
  def initialize (opts)
    @opts = opts
  end
  def consume (workitem)
    workitem.fields['rocket_name'] = @opts['name']
    send_to_the_moon(workitem)
  end
  def cancel (fei, flavour)
    # do nothing
  end
end

engine.register_participant(
  'moon', MyStatelessParticipant, 'name' => 'saturn5')

Remember that the options (the hash that follows the class name), must be serialisable via JSON.

require_path and load_path

It's OK to register a participant by passing it's full classname as a String.

engine.register_participant(
  'auditor', 'AuditParticipant', 'require_path' => 'part/audit.rb')
engine.register_participant(
  'auto_decision', 'DecParticipant', 'load_path' => 'part/dec.rb')

Note the option load_path / require_path that point to the ruby file containing the participant implementation. 'require' will load and eval the ruby code only once, 'load' each time.

# File lib/ruote/engine.rb, line 381
def register_participant (regex, participant=nil, opts={}, &block)

  pa = @context.plist.register(regex, participant, opts, block)

  @context.storage.put_msg(
    'participant_registered',
    'regex' => regex.to_s,
    'engine_worker_only' => (pa != nil))

  pa
end
replay_at_error(err) click to toggle source

Replays at a given error (hopefully you fixed the cause of the error before replaying...)

# File lib/ruote/engine.rb, line 124
def replay_at_error (err)

  msg = err.msg.dup
  action = msg.delete('action')

  msg['replay_at_error'] = true
    # just an indication

  if msg['tree'] && fei = msg['fei']
    #
    # nukes the expression in case of [re]apply
    #
    exp = Ruote::Exp::FlowExpression.fetch(@context, fei)
    exp.unpersist_or_raise if exp
  end

  @context.storage.delete(err.to_h) # remove error

  @context.storage.put_msg(action, msg) # trigger replay
end
shutdown() click to toggle source

Shuts down the engine, mostly passes the shutdown message to the other services and hope they'll shut down properly.

# File lib/ruote/engine.rb, line 237
def shutdown

  @context.shutdown
end
storage() click to toggle source

Returns the storage this engine works with passed at engine initialization.

# File lib/ruote/engine.rb, line 72
def storage

  @context.storage
end
storage_participant() click to toggle source

A convenience method for

sp = Ruote::StorageParticipant.new(engine)

simply do

sp = engine.storage_participant
# File lib/ruote/engine.rb, line 474
def storage_participant

  @storage_participant ||= Ruote::StorageParticipant.new(self)
end
unregister(name_or_participant) click to toggle source
unregister_participant(name_or_participant) click to toggle source

Removes/unregisters a participant from the engine.

# File lib/ruote/engine.rb, line 419
def unregister_participant (name_or_participant)

  re = @context.plist.unregister(name_or_participant)

  raise(ArgumentError.new('participant not found')) unless re

  @context.storage.put_msg(
    'participant_unregistered',
    'regex' => re.to_s)
end
Also aliased as: unregister
wait_for(*items) click to toggle source

This method expects there to be a logger with a wait_for method in the context, else it will raise an exception.

WARNING : wait_for() is meant for environments where there is a unique worker and that worker is nested in this engine. In a multiple worker environment wait_for doesn't see events handled by 'other' workers.

This method is only useful for test/quickstart/examples environments.

engine.wait_for(:alpha)
  # will make the current thread block until a workitem is delivered
  # to the participant named 'alpha'

engine.wait_for('123432123-9043')
  # will make the current thread block until the processed whose
  # wfid is given (String) terminates or produces an error.

engine.wait_for(5)
  # will make the current thread block until 5 messages have been
  # processed on the workqueue...

engine.wait_for(:empty)
  # will return as soon as the engine/storage is empty, ie as soon
  # as there are no more processes running in the engine (no more
  # expressions placed in the storage)

It's OK to wait for multiple wfids :

engine.wait_for('20100612-bezerijozo', '20100612-yakisoba')
# File lib/ruote/engine.rb, line 272
def wait_for (*items)

  logger = @context['s_logger']

  raise(
    "can't wait_for, there is no logger that responds to that call"
  ) unless logger.respond_to?(:wait_for)

  logger.wait_for(items)
end
worker() click to toggle source

Returns the worker nested inside this engine (passed at initialization). Returns nil if this engine is only linked to a storage (and the worker is running somewhere else (hopefully)).

# File lib/ruote/engine.rb, line 81
def worker

  @context.worker
end
workitem(fei) click to toggle source

A convenience methods for advanced users (like Oleg).

Given a fei (flow expression id), fetches the workitem as stored in the expression with that fei. This is the "applied workitem", if the workitem is currently handed to a participant, this method will return the workitem as applied, not the workitem as saved by the participant/user in whatever worklist it uses. If you need that workitem, do the vanilla thing and ask it to the [storage] participant or its worklist.

The fei might be a string fei (result of fei.to_storage_id), a FlowExpressionId instance or a hash.

# File lib/ruote/engine.rb, line 522
def workitem (fei)

  fexp = Ruote::Exp::FlowExpression.fetch(
    @context, Ruote::FlowExpressionId.extract_h(fei))

  Ruote::Workitem.new(fexp.h.applied_workitem)
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.