Class Ruote::Engine
In: lib/ruote/engine.rb
Parent: Object

This class holds the ‘engine’ name, perhaps ‘dashboard’ would have been a better name. Anyway, the methods here allow to launch processes and to query about their status. There are also methods for fixing issues with stalled processes or processes stuck in errors.

NOTE : the methods launch and reply are implemented in Ruote::ReceiverMixin

Methods

Included Modules

ReceiverMixin

Attributes

context  [R] 
variables  [R] 

Public Class methods

Creates an engine using either worker or storage.

If a storage instance is given as the first argument, the engine will be able to manage processes (for example, launch and cancel workflows) but will not actually run any workflows.

If a worker instance is given as the first argument and the second argument is true, engine will start the worker and will be able to both manage and run workflows.

Public Instance methods

Adds a service locally (will not get propagated to other workers).

  tracer = Tracer.new
  @engine.add_service('tracer', tracer)

or

  @engine.add_service('tracer', 'ruote/exp/tracer', 'Ruote::Exp::Tracer')

This method returns the service instance it just bound.

Cancels a segment of process instance. Since expressions are nodes in processes instances, cancelling an expression, will cancel the expression and all its children (the segment of process).

Given a process identifier (wfid), cancels this process.

Sets a configuration option. Examples:

  # allow remote workflow definitions (for subprocesses or when launching
  # processes)
  @engine.configure('remote_definition_allowed', true)

  # allow ruby_eval
  @engine.configure('ruby_eval_allowed', true)

Returns an array of current errors (hashes)

Joins the worker thread. If this engine has no nested worker, calling this method will simply return immediately.

Like cancel_expression, but :on_cancel attributes (of the expressions) are not triggered.

Given a process identifier (wfid), kills this process. Killing is equivalent to cancelling, but when killing, :on_cancel attributes are not triggered.

Loads and parses the process definition at the given path.

A debug helper :

  engine.noisy = true

will let the engine (in fact the worker) pour all the details of the executing process instances to STDOUT.

Returns a list of Ruote::ParticipantEntry instances.

  engine.register_participant :alpha, MyParticipant, 'message' => 'hello'

  # interrogate participant list
  #
  list = engine.participant_list
  participant = list.first
  p participant.regex
    # => "^alpha$"
  p participant.classname
    # => "MyParticipant"
  p participant.options
    # => {"message"=>"hello"}

  # update participant list
  #
  participant.regex = '^alfred$'
  engine.participant_list = list

Returns a ProcessStatus instance describing the current status of a process instance.

Returns a [sorted] list of wfids of the process instances currently running in the engine.

This operation is substantially less costly than Engine#processes (though the ‘how substantially’ depends on the storage chosen).

Returns an array of ProcessStatus instances.

WARNING : this is an expensive operation.

Please note, if you‘re interested only in processes that have errors, Engine#errors is a more efficient mean.

To simply list the wfids of the currently running, Engine#process_wfids is way cheaper to call.

Re-applies an expression (given via its FlowExpressionId).

That will cancel the expression and, once the cancel operation is over (all the children have been cancelled), the expression will get re-applied.

options

:tree is used to completely change the tree of the expression at re_apply

  engine.re_apply(fei, :tree => [ 'participant', { 'ref' => 'bob' }, [] ])

:fields is used to replace the fields of the workitem at re_apply

  engine.re_apply(fei, :fields => { 'customer' => 'bob' })

:merge_in_fields is used to add / override fields

  engine.re_apply(fei, :merge_in_fields => { 'customer' => 'bob' })

A shorter version of register_participant

  engine.register 'alice', MailParticipant, :target => 'alice@example.com'

or a block registering mechanism.

  engine.register do
    alpha 'Participants::Alpha', 'flavour' => 'vanilla'
    participant 'bravo', 'Participants::Bravo', :flavour => 'peach'
    catchall ParticipantCharlie, 'flavour' => 'coconut'
  end

Originally implemented in ruote-kit by Torsten Schoenebaum.

Registers a participant in the engine. Returns the participant instance.

Some examples :

  require 'ruote/part/hash_participant'
  alice = engine.register_participant 'alice', Ruote::HashParticipant
    # register an in-memory (hash) store for Alice's workitems

  engine.register_participant 'compute_sum' do |wi|
    wi.fields['sum'] = wi.fields['articles'].inject(0) do |s, (c, v)|
      s + c * v # sum + count * value
    end
    # a block participant implicitely replies to the engine immediately
  end

  class MyParticipant
    def initialize (name)
      @name = name
    end
    def consume (workitem)
      workitem.fields['rocket_name'] = @name
      send_to_the_moon(workitem)
    end
    def cancel (fei, flavour)
      # do nothing
    end
  end
  engine.register_participant /^moon-.+/, MyParticipant.new('Saturn-V')

‘stateless’ participants are preferred over ‘stateful’ ones

Ruote 2.1 is OK with 1 storage and 1+ workers. The workers may be in other ruby runtimes. This implies that if you have registered a participant instance (instead of passing its classname and options), that participant will only run in the worker ‘embedded’ in the engine where it was registered… Let me rephrase it, participants instantiated at registration time (and that includes block participants) only runs in one worker, always the same.

‘stateless’ participants, instantiated at each dispatch, are preferred. Any worker can handle them.

Block participants are still fine for demos (where the worker is included in the engine (see all the quickstarts). And small engines with 1 worker are not that bad, not everybody is building huge systems).

Here is a ‘stateless’ participant example :

  class MyStatelessParticipant
    def initialize (opts)
      @opts = opts
    end
    def consume (workitem)
      workitem.fields['rocket_name'] = @opts['name']
      send_to_the_moon(workitem)
    end
    def cancel (fei, flavour)
      # do nothing
    end
  end

  engine.register_participant(
    'moon', MyStatelessParticipant, 'name' => 'saturn5')

Remember that the options (the hash that follows the class name), must be serialisable via JSON.

require_path and load_path

It‘s OK to register a participant by passing it‘s full classname as a String.

  engine.register_participant(
    'auditor', 'AuditParticipant', 'require_path' => 'part/audit.rb')
  engine.register_participant(
    'auto_decision', 'DecParticipant', 'load_path' => 'part/dec.rb')

Note the option load_path / require_path that point to the ruby file containing the participant implementation. ‘require’ will load and eval the ruby code only once, ‘load’ each time.

Replays at a given error (hopefully you fixed the cause of the error before replaying…)

Shuts down the engine, mostly passes the shutdown message to the other services and hope they‘ll shut down properly.

Returns the storage this engine works with passed at engine initialization.

A convenience method for

  sp = Ruote::StorageParticipant.new(engine)

simply do

  sp = engine.storage_participant
unregister(name_or_participant)

Removes/unregisters a participant from the engine.

This method expects there to be a logger with a wait_for method in the context, else it will raise an exception.

WARNING : wait_for() is meant for environments where there is a unique worker and that worker is nested in this engine. In a multiple worker environment wait_for doesn‘t see events handled by ‘other’ workers.

This method is only useful for test/quickstart/examples environments.

  engine.wait_for(:alpha)
    # will make the current thread block until a workitem is delivered
    # to the participant named 'alpha'

  engine.wait_for('123432123-9043')
    # will make the current thread block until the processed whose
    # wfid is given (String) terminates or produces an error.

  engine.wait_for(5)
    # will make the current thread block until 5 messages have been
    # processed on the workqueue...

  engine.wait_for(:empty)
    # will return as soon as the engine/storage is empty, ie as soon
    # as there are no more processes running in the engine (no more
    # expressions placed in the storage)

It‘s OK to wait for multiple wfids :

  engine.wait_for('20100612-bezerijozo', '20100612-yakisoba')

Returns the worker nested inside this engine (passed at initialization). Returns nil if this engine is only linked to a storage (and the worker is running somewhere else (hopefully)).

A convenience methods for advanced users (like Oleg).

Given a fei (flow expression id), fetches the workitem as stored in the expression with that fei. This is the "applied workitem", if the workitem is currently handed to a participant, this method will return the workitem as applied, not the workitem as saved by the participant/user in whatever worklist it uses. If you need that workitem, do the vanilla thing and ask it to the [storage] participant or its worklist.

The fei might be a string fei (result of fei.to_storage_id), a FlowExpressionId instance or a hash.

[Validate]