Object
This class holds the 'engine' name, perhaps 'dashboard' would have been a better name. Anyway, the methods here allow to launch processes and to query about their status. There are also methods for fixing issues with stalled processes or processes stuck in errors.
NOTE : the methods launch and reply are implemented in Ruote::ReceiverMixin
Creates an engine using either worker or storage.
If a storage instance is given as the first argument, the engine will be able to manage processes (for example, launch and cancel workflows) but will not actually run any workflows.
If a worker instance is given as the first argument and the second argument is true, engine will start the worker and will be able to both manage and run workflows.
# File lib/ruote/engine.rb, line 58 def initialize (worker_or_storage, run=true) @context = worker_or_storage.context @context.engine = self @variables = EngineVariables.new(@context.storage) @context.worker.run_in_thread if @context.worker && run # launch the worker if there is one end
Adds a service locally (will not get propagated to other workers).
tracer = Tracer.new
@engine.add_service('tracer', tracer)
or
@engine.add_service('tracer', 'ruote/exp/tracer', 'Ruote::Exp::Tracer')
This method returns the service instance it just bound.
# File lib/ruote/engine.rb, line 490 def add_service (name, path_or_instance, classname=nil, opts=nil) @context.add_service(name, path_or_instance, classname, opts) end
Cancels a segment of process instance. Since expressions are nodes in processes instances, cancelling an expression, will cancel the expression and all its children (the segment of process).
# File lib/ruote/engine.rb, line 106 def cancel_expression (fei) fei = fei.to_h if fei.respond_to?(:to_h) @context.storage.put_msg('cancel', 'fei' => fei) end
Given a process identifier (wfid), cancels this process.
# File lib/ruote/engine.rb, line 88 def cancel_process (wfid) @context.storage.put_msg('cancel_process', 'wfid' => wfid) end
Sets a configuration option. Examples:
# allow remote workflow definitions (for subprocesses or when launching
# processes)
@engine.configure('remote_definition_allowed', true)
# allow ruby_eval
@engine.configure('ruby_eval_allowed', true)
# File lib/ruote/engine.rb, line 504 def configure (config_key, value) @context[config_key] = value end
Returns an array of current errors (hashes)
# File lib/ruote/engine.rb, line 214 def errors (wfid=nil) wfid.nil? ? @context.storage.get_many('errors') : @context.storage.get_many('errors', /!#{wfid}$/) end
Joins the worker thread. If this engine has no nested worker, calling this method will simply return immediately.
# File lib/ruote/engine.rb, line 286 def join worker.join if worker end
Like cancel_expression, but :on_cancel attributes (of the expressions) are not triggered.
# File lib/ruote/engine.rb, line 115 def kill_expression (fei) fei = fei.to_h if fei.respond_to?(:to_h) @context.storage.put_msg('cancel', 'fei' => fei, 'flavour' => 'kill') end
Given a process identifier (wfid), kills this process. Killing is equivalent to cancelling, but when killing, :on_cancel attributes are not triggered.
# File lib/ruote/engine.rb, line 97 def kill_process (wfid) @context.storage.put_msg('kill_process', 'wfid' => wfid) end
Loads and parses the process definition at the given path.
# File lib/ruote/engine.rb, line 293 def load_definition (path) @context.parser.parse(path) end
A debug helper :
engine.noisy = true
will let the engine (in fact the worker) pour all the details of the executing process instances to STDOUT.
# File lib/ruote/engine.rb, line 537 def noisy= (b) @context.logger.noisy = b end
Returns a list of Ruote::ParticipantEntry instances.
engine.register_participant :alpha, MyParticipant, 'message' => 'hello'
# interrogate participant list
#
list = engine.participant_list
participant = list.first
p participant.regex
# => "^alpha$"
p participant.classname
# => "MyParticipant"
p participant.options
# => {"message"=>"hello"}
# update participant list
#
participant.regex = '^alfred$'
engine.participant_list = list
# File lib/ruote/engine.rb, line 452 def participant_list @context.plist.list end
Accepts a list of Ruote::ParticipantEntry instances.
# File lib/ruote/engine.rb, line 461 def participant_list= (pl) @context.plist.list = pl end
Returns a ProcessStatus instance describing the current status of a process instance.
# File lib/ruote/engine.rb, line 173 def process (wfid) exps = @context.storage.get_many('expressions', /!#{wfid}$/) errs = self.errors( wfid ) return nil if exps.empty? && errs.empty? ProcessStatus.new(@context, exps, errs) end
Returns a [sorted] list of wfids of the process instances currently running in the engine.
This operation is substantially less costly than Engine#processes (though the 'how substantially' depends on the storage chosen).
# File lib/ruote/engine.rb, line 227 def process_wfids @context.storage.ids('expressions').collect { |sfei| sfei.split('!').last }.uniq.sort end
Returns an array of ProcessStatus instances.
WARNING : this is an expensive operation.
Please note, if you're interested only in processes that have errors, Engine#errors is a more efficient mean.
To simply list the wfids of the currently running, Engine#process_wfids is way cheaper to call.
# File lib/ruote/engine.rb, line 193 def processes exps = @context.storage.get_many('expressions') errs = self.errors by_wfid = {} exps.each do |exp| (by_wfid[exp['fei']['wfid']] ||= [ [], [] ]).first << exp end errs.each do |err| (by_wfid[err['msg']['fei']['wfid']] ||= [ [], [] ]).last << err end by_wfid.values.collect { |expressions, errors| ProcessStatus.new(@context, expressions, errors) } end
Re-applies an expression (given via its FlowExpressionId).
That will cancel the expression and, once the cancel operation is over (all the children have been cancelled), the expression will get re-applied.
:tree is used to completely change the tree of the expression at re_apply
engine.re_apply(fei, :tree => [ 'participant', { 'ref' => 'bob' }, [] ])
:fields is used to replace the fields of the workitem at re_apply
engine.re_apply(fei, :fields => { 'customer' => 'bob' })
:merge_in_fields is used to add / override fields
engine.re_apply(fei, :merge_in_fields => { 'customer' => 'bob' })
# File lib/ruote/engine.rb, line 165 def re_apply (fei, opts={}) @context.storage.put_msg('cancel', 'fei' => fei.to_h, 're_apply' => opts) end
A shorter version of register_participant
engine.register 'alice', MailParticipant, :target => 'alice@example.com'
or a block registering mechanism.
engine.register do alpha 'Participants::Alpha', 'flavour' => 'vanilla' participant 'bravo', 'Participants::Bravo', :flavour => 'peach' catchall ParticipantCharlie, 'flavour' => 'coconut' end
Originally implemented in ruote-kit by Torsten Schoenebaum.
# File lib/ruote/engine.rb, line 407 def register (*args, &block) if args.size > 0 register_participant(*args, &block) else proxy = ParticipantRegistrationProxy.new(self) block.arity < 1 ? proxy.instance_eval(&block) : block.call(proxy) end end
Registers a participant in the engine. Returns the participant instance.
Some examples :
require 'ruote/part/hash_participant'
alice = engine.register_participant 'alice', Ruote::HashParticipant
# register an in-memory (hash) store for Alice's workitems
engine.register_participant 'compute_sum' do |wi|
wi.fields['sum'] = wi.fields['articles'].inject(0) do |s, (c, v)|
s + c * v # sum + count * value
end
# a block participant implicitely replies to the engine immediately
end
class MyParticipant
def initialize (name)
@name = name
end
def consume (workitem)
workitem.fields['rocket_name'] = @name
send_to_the_moon(workitem)
end
def cancel (fei, flavour)
# do nothing
end
end
engine.register_participant /^moon-.+/, MyParticipant.new('Saturn-V')
Ruote 2.1 is OK with 1 storage and 1+ workers. The workers may be in other ruby runtimes. This implies that if you have registered a participant instance (instead of passing its classname and options), that participant will only run in the worker 'embedded' in the engine where it was registered... Let me rephrase it, participants instantiated at registration time (and that includes block participants) only runs in one worker, always the same.
'stateless' participants, instantiated at each dispatch, are preferred. Any worker can handle them.
Block participants are still fine for demos (where the worker is included in the engine (see all the quickstarts). And small engines with 1 worker are not that bad, not everybody is building huge systems).
Here is a 'stateless' participant example :
class MyStatelessParticipant
def initialize (opts)
@opts = opts
end
def consume (workitem)
workitem.fields['rocket_name'] = @opts['name']
send_to_the_moon(workitem)
end
def cancel (fei, flavour)
# do nothing
end
end
engine.register_participant(
'moon', MyStatelessParticipant, 'name' => 'saturn5')
Remember that the options (the hash that follows the class name), must be serialisable via JSON.
It's OK to register a participant by passing it's full classname as a String.
engine.register_participant( 'auditor', 'AuditParticipant', 'require_path' => 'part/audit.rb') engine.register_participant( 'auto_decision', 'DecParticipant', 'load_path' => 'part/dec.rb')
Note the option load_path / require_path that point to the ruby file containing the participant implementation. 'require' will load and eval the ruby code only once, 'load' each time.
# File lib/ruote/engine.rb, line 381 def register_participant (regex, participant=nil, opts={}, &block) pa = @context.plist.register(regex, participant, opts, block) @context.storage.put_msg( 'participant_registered', 'regex' => regex.to_s, 'engine_worker_only' => (pa != nil)) pa end
Replays at a given error (hopefully you fixed the cause of the error before replaying...)
# File lib/ruote/engine.rb, line 124 def replay_at_error (err) msg = err.msg.dup action = msg.delete('action') msg['replay_at_error'] = true # just an indication if msg['tree'] && fei = msg['fei'] # # nukes the expression in case of [re]apply # exp = Ruote::Exp::FlowExpression.fetch(@context, fei) exp.unpersist_or_raise if exp end @context.storage.delete(err.to_h) # remove error @context.storage.put_msg(action, msg) # trigger replay end
Shuts down the engine, mostly passes the shutdown message to the other services and hope they'll shut down properly.
# File lib/ruote/engine.rb, line 237 def shutdown @context.shutdown end
Returns the storage this engine works with passed at engine initialization.
# File lib/ruote/engine.rb, line 72 def storage @context.storage end
A convenience method for
sp = Ruote::StorageParticipant.new(engine)
simply do
sp = engine.storage_participant
# File lib/ruote/engine.rb, line 474 def storage_participant @storage_participant ||= Ruote::StorageParticipant.new(self) end
Removes/unregisters a participant from the engine.
# File lib/ruote/engine.rb, line 419 def unregister_participant (name_or_participant) re = @context.plist.unregister(name_or_participant) raise(ArgumentError.new('participant not found')) unless re @context.storage.put_msg( 'participant_unregistered', 'regex' => re.to_s) end
This method expects there to be a logger with a wait_for method in the context, else it will raise an exception.
WARNING : wait_for() is meant for environments where there is a unique worker and that worker is nested in this engine. In a multiple worker environment wait_for doesn't see events handled by 'other' workers.
This method is only useful for test/quickstart/examples environments.
engine.wait_for(:alpha)
# will make the current thread block until a workitem is delivered
# to the participant named 'alpha'
engine.wait_for('123432123-9043')
# will make the current thread block until the processed whose
# wfid is given (String) terminates or produces an error.
engine.wait_for(5)
# will make the current thread block until 5 messages have been
# processed on the workqueue...
engine.wait_for(:empty)
# will return as soon as the engine/storage is empty, ie as soon
# as there are no more processes running in the engine (no more
# expressions placed in the storage)
It's OK to wait for multiple wfids :
engine.wait_for('20100612-bezerijozo', '20100612-yakisoba')
# File lib/ruote/engine.rb, line 272 def wait_for (*items) logger = @context['s_logger'] raise( "can't wait_for, there is no logger that responds to that call" ) unless logger.respond_to?(:wait_for) logger.wait_for(items) end
Returns the worker nested inside this engine (passed at initialization). Returns nil if this engine is only linked to a storage (and the worker is running somewhere else (hopefully)).
# File lib/ruote/engine.rb, line 81 def worker @context.worker end
A convenience methods for advanced users (like Oleg).
Given a fei (flow expression id), fetches the workitem as stored in the expression with that fei. This is the "applied workitem", if the workitem is currently handed to a participant, this method will return the workitem as applied, not the workitem as saved by the participant/user in whatever worklist it uses. If you need that workitem, do the vanilla thing and ask it to the [storage] participant or its worklist.
The fei might be a string fei (result of fei.to_storage_id), a FlowExpressionId instance or a hash.
# File lib/ruote/engine.rb, line 522 def workitem (fei) fexp = Ruote::Exp::FlowExpression.fetch( @context, Ruote::FlowExpressionId.extract_h(fei)) Ruote::Workitem.new(fexp.h.applied_workitem) end
Generated with the Darkfish Rdoc Generator 2.