Object
Ruote is a process definition interpreter. It doesn't directly "read" process definitions, it relies on a parser/generator to produce "abstract syntax trees" that look like
[ expression_name, { ... attributes ... }, [ children_expressions ] ]
The nodes (and leaves) in the trees are expressions. This is the base class for all expressions.
The most visible expressions are "define", "sequence" and "participant". Think :
pdef = Ruote.process_definition do
sequence do
participant :ref => 'customer'
participant :ref => 'accounting'
participant :ref => 'logistics'
end
end
Each node is an expression...
're-opening' the FlowExpression class to add the methods about variables.
Reopening the FlowExpression class to add [un]persist methods.
Those methods are mixed in FlowExpression. They were put here to offload FlowExpression and especially, to gather them around their attribute topic.
# File lib/ruote/exp/flowexpression.rb, line 166 def self.do_action (context, msg) fei = msg['fei'] action = msg['action'] if action == 'reply' && fei['engine_id'] != context.engine_id # # the reply has to go to another engine, let's locate the # 'engine participant' and give it the workitem/reply # # see ft_37 for a test/example engine_participant = context.plist.lookup(fei['engine_id'], msg['workitem']) raise( "no EngineParticipant found under name '#{fei['engine_id']}'" ) unless engine_participant engine_participant.reply(fei, msg['workitem']) return end # normal case fexp = nil 3.times do fexp = fetch(context, msg['fei']) break if fexp sleep 0.028 end # this retry system is only useful with ruote-couch fexp.send("do_#{action}", msg) if fexp end
Fetches an expression from the storage and readies it for service.
# File lib/ruote/exp/flowexpression.rb, line 141 def self.fetch (context, fei) return nil if fei.nil? fexp = context.storage.get('expressions', Ruote.to_storage_id(fei)) fexp ? from_h(context, fexp) : nil end
Instantiates expression back from hash.
# File lib/ruote/exp/flowexpression.rb, line 132 def self.from_h (context, h) exp_class = context.expmap.expression_class(h['name']) exp_class.new(context, h) end
Keeping track of names and aliases for the expression
# File lib/ruote/exp/flowexpression.rb, line 156 def self.names (*exp_names) exp_names = exp_names.collect { |n| n.to_s } meta_def(:expression_names) { exp_names } end
# File lib/ruote/exp/flowexpression.rb, line 83 def initialize (context, h) @context = context @msg = nil self.h = h h._id ||= Ruote.to_storage_id(h.fei) h['type'] ||= 'expressions' h.name ||= self.class.expression_names.first h.children ||= [] h.applied_workitem['fei'] = h.fei h.created_time ||= Ruote.now_to_utc_s h.on_cancel ||= attribute(:on_cancel) h.on_error ||= attribute(:on_error) h.on_timeout ||= attribute(:on_timeout) end
Returns true if the given fei points to an expression in the parent chain of this expression.
# File lib/ruote/exp/flowexpression.rb, line 466 def ancestor? (fei) return false unless h.parent_id return true if h.parent_id == fei parent.ancestor?(fei) end
Returns the value for attribute 'key', this value should be present in the array list 'values'. If not, the default value is returned. By default, the default value is the first element of 'values'.
# File lib/ruote/exp/ro_attributes.rb, line 75 def att (key, values, opts={}) default = opts[:default] || values.first val = attribute(key) val = val.to_s if val #raise( # ArgumentError.new("attribute '#{key}' missing in #{tree}") #) if opts[:mandatory] && val == nil #raise( # ArgumentError.new("attribute '#{key}' has invalid value in #{tree}") #) if opts[:enforce] && (not values.include?(val)) values.include?(val) ? val : default end
Looks up the value for attribute n.
# File lib/ruote/exp/ro_attributes.rb, line 48 def attribute (n, workitem=h.applied_workitem, options={}) n = n.to_s default = options[:default] escape = options[:escape] string = options[:to_s] || options[:string] v = attributes[n] v = if v == nil default elsif escape v else Ruote.dosub(v, self, workitem) end v = v.to_s if v and string v end
Given something like
sequence do participant 'alpha' end
in the context of the participant expression
attribute_text()
will yield 'alpha'.
# File lib/ruote/exp/ro_attributes.rb, line 151 def attribute_text (workitem=h.applied_workitem) text = attributes.keys.find { |k| attributes[k] == nil } Ruote.dosub(text.to_s, self, workitem) end
# File lib/ruote/exp/flowexpression.rb, line 568 def attributes tree[1] end
This default implementation cancels all the [registered] children of this expression.
# File lib/ruote/exp/flowexpression.rb, line 373 def cancel (flavour) return reply_to_parent(h.applied_workitem) if h.children.empty? # # there are no children, nothing to cancel, let's just reply to # the parent expression do_persist || return # # before firing the cancel message to the children # # if the do_persist returns false, it means it failed, implying this # expression is stale, let's return, thus discarding this cancel message children.each do |cfei| # # let's send a cancel message to each of the children # # maybe some of them are gone or have not yet been applied, anyway, # the message are sent @context.storage.put_msg( 'cancel', 'fei' => cfei, 'parent_id' => h.fei, # indicating that this is a "cancel child" 'flavour' => flavour) end #if ! children.find { |i| Ruote::Exp::FlowExpression.fetch(@context, i) } # # # # since none of the children could be found in the storage right now, # # it could mean that all children are already done or it could mean # # that they are not yet applied... # # # # just to be sure let's send a new cancel message to this expression # # # # it's very important, since if there is no child to cancel the parent # # the flow might get stuck here # @context.storage.put_msg( # 'cancel', # 'fei' => h.fei, # 'flavour' => flavour) #end end
Returns a Hash containing all attributes set for an expression with their values resolved.
# File lib/ruote/exp/ro_attributes.rb, line 116 def compile_atts (opts={}) attributes.keys.inject({}) { |r, k| r[k] = attribute(k, h.applied_workitem, opts) r } end
Returns a fresh hash of all the variables visible from this expression.
This is used mainly when forgetting an expression.
# File lib/ruote/exp/ro_variables.rb, line 44 def compile_variables vars = h.parent_id ? parent.compile_variables : {} vars.merge!(h.variables) if h.variables vars end
# File lib/ruote/exp/flowexpression.rb, line 203 def do_apply if not Condition.apply?(attribute(:if), attribute(:unless)) return reply_to_parent(h.applied_workitem) end if attribute(:forget).to_s == 'true' i = h.parent_id wi = Ruote.fulldup(h.applied_workitem) h.variables = compile_variables h.parent_id = nil h.forgotten = true @context.storage.put_msg('reply', 'fei' => i, 'workitem' => wi) end consider_tag consider_timeout apply end
The raw handling of messages passed to expressions (the fine handling is done in the cancel method).
# File lib/ruote/exp/flowexpression.rb, line 326 def do_cancel (msg) flavour = msg['flavour'] return if h.state == 'cancelling' && flavour != 'kill' # cancel on cancel gets discarded return if h.state == 'failed' && flavour == 'timeout' # do not timeout expressions that are "in error" (failed) @msg = Ruote.fulldup(msg) h.state = case flavour when 'kill' then 'dying' when 'timeout' then 'timing_out' else 'cancelling' end h.applied_workitem['fields']['__timed_out__'] = [ h.fei, Ruote.now_to_utc_s ] if h.state == 'timing_out' if h.state == 'cancelling' if t = msg['on_cancel'] h.on_cancel = t elsif hra = msg['re_apply'] hra = {} if hra == true h.on_cancel = hra['tree'] || tree if fs = hra['fields'] h.applied_workitem['fields'] = fs end if mfs = hra['merge_in_fields'] h.applied_workitem['fields'].merge!(mfs) end end end cancel(flavour) end
# File lib/ruote/exp/flowexpression.rb, line 418 def do_fail (msg) @h['state'] = 'failing' @h['applied_workitem'] = msg['workitem'] if h.children.size < 1 reply_to_parent(@h['applied_workitem']) else persist_or_raise h.children.each { |i| @context.storage.put_msg('cancel', 'fei' => i) } end end
# File lib/ruote/exp/ro_persist.rb, line 111 def do_persist do_p(:persist) end
(only makes sense for the participant expression though)
# File lib/ruote/exp/flowexpression.rb, line 281 def do_reply (msg) @msg = Ruote.fulldup(msg) # keeping the message, for 'retry' in collision cases workitem = msg['workitem'] fei = workitem['fei'] if ut = msg['updated_tree'] ct = tree.dup ct.last[Ruote::FlowExpressionId.child_id(fei)] = ut update_tree(ct) end h.children.delete(fei) # accept without any check ? if h.state != nil # failing or timing out ... if h.children.size < 1 reply_to_parent(workitem) else persist_or_raise # for the updated h.children end else # vanilla reply reply(workitem) end end
# File lib/ruote/exp/ro_persist.rb, line 116 def do_unpersist do_p(:unpersist) end
Like compile_atts, but the keys are expanded as well.
Useful for things like
set "f:${v:field_name}" => "${v:that_variable}"
# File lib/ruote/exp/ro_attributes.rb, line 130 def expand_atts (opts={}) attributes.keys.inject({}) { |r, k| kk = Ruote.dosub(k, self, h.applied_workitem) r[kk] = attribute(k, h.applied_workitem, opts) r } end
# File lib/ruote/exp/flowexpression.rb, line 110 def fei Ruote::FlowExpressionId.new(h.fei) end
Generates a sub_wfid, without hitting storage.
There's a better implementation for sure...
# File lib/ruote/exp/flowexpression.rb, line 584 def get_next_sub_wfid i = [ $$, Time.now.to_f.to_s, self.hash.to_s, @h['fei'].inspect ].join('-').hash @@sub_wfid_counter = (@@sub_wfid_counter + 1) % 1000 i = i * 1000 + (@@sub_wfid_counter) (i < 0 ? "1#{i * -1}" : "0#{i}").to_s end
# File lib/ruote/exp/flowexpression.rb, line 103 def h= (hash) @h = hash class << h include Ruote::HashDot end end
Looks up parent with on_error attribute and triggers it
# File lib/ruote/exp/flowexpression.rb, line 506 def handle_on_error (msg, error) return false if h.state == 'failing' oe_parent = lookup_on_error return false unless oe_parent # no parent with on_error attribute found handler = oe_parent.on_error.to_s return false if handler == '' # empty on_error handler nullifies ancestor's on_error workitem = msg['workitem'] workitem['fields']['__error__'] = [ h.fei, Ruote.now_to_utc_s, error.class.to_s, error.message, error.backtrace ] @context.storage.put_msg( 'fail', 'fei' => oe_parent.h.fei, 'workitem' => workitem) true # yes, error is being handled. end
Given a list of attribute names, returns the first attribute name for which there is a value.
# File lib/ruote/exp/ro_attributes.rb, line 37 def has_attribute (*args) args.each { |a| a = a.to_s; return a if attributes[a] != nil } nil end
Persists and fetches the _rev identifier from the storage.
Only used by the worker when creating the expression.
# File lib/ruote/exp/ro_persist.rb, line 41 def initial_persist r = @context.storage.put(@h, :update_rev => true) raise( "initial_persist failed for " + "#{Ruote.to_storage_id(h.fei)} #{tree.first}" ) if r != nil nil end
TODO : redoc rewrite needed
This method is mostly used by the worker when looking up a process name or participant name bound under a variable.
# File lib/ruote/exp/ro_variables.rb, line 126 def iterative_var_lookup (k) v = lookup_variable(k) return [ k, v ] unless (v.is_a?(String) or v.is_a?(Symbol)) iterative_var_lookup(v) end
# File lib/ruote/exp/flowexpression.rb, line 435 def launch_sub (pos, subtree, opts={}) i = h.fei.dup i['sub_wfid'] = get_next_sub_wfid i['expid'] = pos #p '=== launch_sub ===' #p [ :launcher, h.fei['expid'], h.fei['sub_wfid'], h.fei['wfid'] ] #p [ :launched, i['expid'], i['sub_wfid'], i['wfid'] ] forget = opts[:forget] register_child(i) unless forget variables = ( forget ? compile_variables : {} ).merge!(opts[:variables] || {}) @context.storage.put_msg( 'launch', 'fei' => i, 'parent_id' => forget ? nil : h.fei, 'tree' => subtree, 'workitem' => opts[:workitem] || h.applied_workitem, 'variables' => variables, 'forgotten' => forget) end
Looks up "on_error" attribute
# File lib/ruote/exp/flowexpression.rb, line 476 def lookup_on_error if h.on_error self elsif h.parent_id par = parent # :( get_parent would probably be a better name for #parent unless par puts "~~" puts "parent gone for" p h.fei p h.parent_id p tree puts "~~" end par ? par.lookup_on_error : nil else nil end end
# File lib/ruote/exp/ro_attributes.rb, line 104 def lookup_val (att_options={}) lval( VV, s_cartesian(] v var variable ], VV), s_cartesian(] f fld field ], VV), att_options) end
prefix = 'on' => will lookup on, on_val, on_value, on_v, on_var, on_variable, on_f, on_fld, on_field...
# File lib/ruote/exp/ro_attributes.rb, line 95 def lookup_val_prefix (prefix, att_options={}) lval( [ prefix ] + [ 'val', 'value' ].map { |s| "#{prefix}_#{s}" }, ] v var variable ].map { |s| "#{prefix}_#{s}" }, ] f fld field ].map { |s| "#{prefix}_#{s}" }, att_options) end
Looks up the value of a variable in expression tree (seen from a leaf, it looks more like a stack than a tree)
# File lib/ruote/exp/ro_variables.rb, line 55 def lookup_variable (var, prefix=nil) var, prefix = split_prefix(var, prefix) return @context.storage.get_engine_variable(var) if prefix.length >= 2 return parent.lookup_variable(var, prefix) if h.parent_id && prefix.length >= 1 if h.variables val = Ruote.lookup(h.variables, var) return val if val != nil end if h.parent_id && h.parent_id['engine_id'] == @context.engine_id # # do not lookup variables in a remote engine ... (return parent.lookup_variable(var, prefix)) rescue nil # if the lookup fails (parent gone) then rescue and let go end @context.storage.get_engine_variable(var) end
A shortcut for lookup_variable
# File lib/ruote/exp/flowexpression.rb, line 118 def parent Ruote::Exp::FlowExpression.fetch(@context, h.parent_id) end
# File lib/ruote/exp/flowexpression.rb, line 114 def parent_id h.parent_id ? Ruote::FlowExpressionId.new(h.parent_id) : nil end
# File lib/ruote/exp/ro_persist.rb, line 88 def persist_or_raise r = try_persist raise( "persist failed for " + "#{Ruote.to_storage_id(h.fei)} #{tree.first} #{r.class}" ) if r end
# File lib/ruote/exp/flowexpression.rb, line 228 def reply_to_parent (workitem, delete=true) if h.tagname unset_variable(h.tagname) @context.storage.put_msg( 'left_tag', 'tag' => h.tagname, 'fei' => h.fei) end if h.timeout_schedule_id && h.state != 'timing_out' @context.storage.delete_schedule(h.timeout_schedule_id) end if h.state == 'failing' # on_error is implicit (#fail got called) trigger('on_error', workitem) elsif (h.state == 'cancelling') and h.on_cancel trigger('on_cancel', workitem) elsif (h.state == 'timing_out') and h.on_timeout trigger('on_timeout', workitem) else # vanilla reply #unpersist_or_raise if delete #try_unpersist if delete if delete do_unpersist || return end if h.parent_id @context.storage.put_msg( 'reply', 'fei' => h.parent_id, 'workitem' => workitem.merge!('fei' => h.fei), 'updated_tree' => h.updated_tree) # nil most of the time else @context.storage.put_msg( h.forgotten ? 'ceased' : 'terminated', 'wfid' => h.fei['wfid'], 'fei' => h.fei, 'workitem' => workitem) end end end
Sets a variable to a given value. (will set at the appropriate level).
# File lib/ruote/exp/ro_variables.rb, line 94 def set_variable (var, val) fexp, v = locate_var(var) raise( ArgumentError.new("cannot set var at engine level : #{var}") ) if fexp.nil? fexp.un_set_variable(:set, v, val, true) end
Turns this FlowExpression instance into a Hash (well, just hands back the base hash behind it.
# File lib/ruote/exp/flowexpression.rb, line 125 def to_h @h end
Returns the current version of the tree (returns the updated version if it got updated.
# File lib/ruote/exp/flowexpression.rb, line 540 def tree h.updated_tree || h.original_tree end
# File lib/ruote/exp/flowexpression.rb, line 572 def tree_children tree[2] end
# File lib/ruote/exp/ro_persist.rb, line 53 def try_persist r = @context.storage.put(@h) #t = Thread.current.object_id.to_s[-3..-1] #puts "+ per #{h.fei['expid']} #{tree.first} #{h._rev} #{t} -> #{r.class}" #Ruote.p_caller('+ per') #if r != nil || h.fei['expid'] == '0_0' r end
# File lib/ruote/exp/ro_persist.rb, line 64 def try_unpersist r = @context.storage.delete(@h) #t = Thread.current.object_id.to_s[-3..-1] #puts "- unp #{h.fei['expid']} #{tree.first} #{h._rev} #{t} -> #{r.class}" #Ruote.p_caller('- unp') #if r != nil || h.fei['expid'] == '0_0' return r if r #if h.has_error err = @context.storage.get('errors', "err_#{Ruote.to_storage_id(h.fei)}") @context.storage.delete(err) if err #end # removes any error in the journal for this expression # since it will now be gone, no need to keep track of its errors nil end
# File lib/ruote/exp/ro_persist.rb, line 98 def unpersist_or_raise r = try_unpersist raise( "unpersist failed for " + "#{Ruote.to_storage_id(h.fei)} #{tree.first} #{r.class}" ) if r end
Unbinds a variables.
# File lib/ruote/exp/ro_variables.rb, line 107 def unset_variable (var) fexp, v = locate_var(var) raise( ArgumentError.new("cannot set var at engine level : #{var}") ) if fexp.nil? should_persist = (fexp.h.fei != h.fei) # don't use a ticket when expression wants to modify its own vars fexp.un_set_variable(:unset, v, nil, should_persist) end
Updates the tree of this expression
update_tree(t)
will set the updated tree to t
update_tree
will copy (deep copy) the original tree as the updated_tree.
Adding a child to a sequence expression :
seq.update_tree
seq.updated_tree[2] << [ 'participant', { 'ref' => 'bob' }, [] ]
seq.do_persist
# File lib/ruote/exp/flowexpression.rb, line 560 def update_tree (t=nil) h.updated_tree = t || Ruote.fulldup(h.original_tree) end
# File lib/ruote/exp/flowexpression.rb, line 641 def apply_child (child_index, workitem, forget=false) msg = pre_apply_child(child_index, workitem, forget) persist_or_raise unless forget @context.storage.put_msg('apply', msg) end
# File lib/ruote/exp/flowexpression.rb, line 656 def consider_tag if h.tagname = attribute(:tag) set_variable(h.tagname, h.fei) @context.storage.put_msg( 'entered_tag', 'tag' => h.tagname, 'fei' => h.fei) end end
Called by do_apply. Overriden in ParticipantExpression.
# File lib/ruote/exp/flowexpression.rb, line 669 def consider_timeout do_schedule_timeout(attribute(:timeout)) end
# File lib/ruote/exp/ro_attributes.rb, line 160 def determine_tos [ attribute(:to_v) || attribute(:to_var) || attribute(:to_variable), attribute(:to_f) || attribute(:to_fld) || attribute(:to_field) ] end
# File lib/ruote/exp/ro_persist.rb, line 123 def do_p (pers) case r = self.send("try_#{pers}") when true false # don't go on when Hash self.h = r self.send("do_#{@msg['action']}", @msg) false # don't go on else true # success, please go on end end
Called by consider_timeout (FlowExpression) and schedule_timeout (ParticipantExpression).
# File lib/ruote/exp/flowexpression.rb, line 677 def do_schedule_timeout (timeout) return unless timeout return if timeout.strip == '' h.timeout_schedule_id = @context.storage.put_schedule( 'at', h.fei, timeout, 'action' => 'cancel', 'fei' => h.fei, 'flavour' => 'timeout') end
Returns the flow expression that owns a variable (or the one that should own it) and the var without its potential / prefixes.
# File lib/ruote/exp/ro_variables.rb, line 182 def locate_var (var, prefix=nil) var, prefix = split_prefix(var, prefix) return nil if prefix.length >= 2 # engine variable return parent.locate_var(var, prefix) if prefix.length == 1 && h.parent_id # no prefix... return [ self, var ] if h.variables return parent.locate_var(var, prefix) if h.parent_id raise "uprooted var lookup, something went wrong" end
# File lib/ruote/exp/ro_attributes.rb, line 173 def lval (vals, vars, flds, att_options) if k = has_att(*vals) attribute(k, h.applied_workitem, att_options) elsif k = has_att(*vars) k = attribute(k, h.applied_workitem, att_options) lookup_variable(k) elsif k = has_att(*flds) #k = attribute(k, @applied_workitem, att_options) #@applied_workitem.attributes[k.to_s] || #@applied_workitem.attributes[k.to_s.to_sym] k = attribute(k, h.applied_workitem, att_options) h.applied_workitem['fields'][k] # TODO : what about leveraging workitem#lookup ? else nil end end
# File lib/ruote/exp/flowexpression.rb, line 623 def pre_apply_child (child_index, workitem, forget) child_fei = h.fei.merge('expid' => "#{h.fei['expid']}_#{child_index}") h.children << child_fei unless forget msg = { 'fei' => child_fei, 'tree' => tree.last[child_index], 'parent_id' => forget ? nil : h.fei, 'variables' => forget ? compile_variables : nil, 'workitem' => workitem } msg['forgotten'] = true if forget msg end
# File lib/ruote/exp/flowexpression.rb, line 650 def register_child (fei) h.children << fei persist_or_raise end
# File lib/ruote/exp/ro_attributes.rb, line 168 def s_cartesian (a0, a1) a0.inject([]) { |a, e0| a + a1.collect { |e1| "#{e0}_#{e1}" } } end
Used by lookup_variable and set_variable to extract the prefix in a variable name
# File lib/ruote/exp/ro_variables.rb, line 167 def split_prefix (var, prefix) if prefix.nil? var = var.to_s m = VAR_PREFIX_REGEX.match(var) prefix = m ? m[1][0, 2] : '' var = var[prefix.length..-1] end [ var, prefix ] end
(Called by trigger_on_cancel & co)
# File lib/ruote/exp/flowexpression.rb, line 693 def supplant_with (tree, opts) # at first, nuke self r = try_unpersist raise( "failed to remove exp to supplant "+ "#{Ruote.to_storage_id(h.fei)} #{tree.first}" ) if r.respond_to?(:keys) # then re-apply if t = opts['trigger'] tree[1]['_triggered'] = t.to_s end @context.storage.put_msg( 'apply', { 'fei' => h.fei, 'parent_id' => h.parent_id, 'tree' => tree, 'workitem' => h.applied_workitem, 'variables' => h.variables }.merge!(opts)) end
# File lib/ruote/exp/flowexpression.rb, line 598 def to_dot (opts) i = fei() label = "#{[ i.wfid, i.sub_wfid, i.expid].join(" ")} #{tree.first}" label += " (#{h.state})" if h.state a = [] a << "\"#{i.to_storage_id}\" [ label=\"#{label}\" ];" # parent if h.parent_id a << "\"#{i.to_storage_id}\" -> \"#{parent_id.to_storage_id}\";" end # children h.children.each do |cfei| a << "\"#{i.to_storage_id}\" -> \"#{Ruote.to_storage_id(cfei)}\";" end a end
'on_{error|timeout|cancel}' triggering
# File lib/ruote/exp/flowexpression.rb, line 722 def trigger (on, workitem) hon = h[on] t = hon.is_a?(String) ? [ hon, {}, [] ] : hon if on == 'on_error' if hon == 'redo' t = tree elsif hon == 'undo' h.state = 'failed' reply_to_parent(workitem) return end elsif on == 'on_timeout' t = tree if hon == 'redo' end supplant_with(t, 'trigger' => on) end
Sets (or unsets) the value of a local variable
val should be nil in case of 'unset'.
# File lib/ruote/exp/ro_variables.rb, line 141 def un_set_variable (op, var, val, should_persist) if op == :set Ruote.set(h.variables, var, val) else # op == :unset Ruote.unset(h.variables, var) end return unless should_persist if r = try_persist # persist failed, have to retry @h = r un_set_variable(op, var, val, true) else # success @context.storage.put_msg("variable_#{op}", 'var' => var, 'fei' => h.fei) end end
Generated with the Darkfish Rdoc Generator 2.