Provides methods for 'local' participants.
Assumes the class that includes this module has a context method that points to the worker or engine ruote context.
It's "local" because it has access to the ruote storage.
Use this method to re_dispatch the workitem.
It takes two options :in and :at for "later re_dispatch".
Look at the unschedule_re_dispatch method for an example of participant implementation that uses re_dispatch.
Without one of those options, the method is a "reject".
# File lib/ruote/part/local_participant.rb, line 54 def re_dispatch (workitem, opts={}) msg = { 'action' => 'dispatch', 'fei' => workitem.h.fei, 'workitem' => workitem.h, 'participant_name' => workitem.participant_name, 'rejected' => true } if t = opts[:in] || opts[:at] sched_id = @context.storage.put_schedule('at', workitem.h.fei, t, msg) fexp = fetch_flow_expression(workitem) fexp.h['re_dispatch_sched_id'] = sched_id fexp.try_persist else @context.storage.put_msg('dispatch', msg) end end
WARNING : this method is only for 'stateless' participants, ie participants that are registered in the engine by passing their class and a set of options, like in
engine.register_participant 'alpha', MyParticipant, 'info' => 'none'
This reject method replaces the workitem in the [internal] message queue of the ruote engine (since it's a local participant, it has access to the storage and it's thus easy). The idea is that another worker will pick up the workitem and do the participant dispatching.
This is an advanced technique. It was requested by people who want to have multiple workers and have only certain worker/participants do the handling. Using reject is not the best method, it's probably better to implement this by re-opening the Ruote::Worker class and changing the cannot_handle(msg) method.
reject could be useful anyway, not sure now, but one could imagine scenarii where some participants reject workitems temporarily (while the same participant on another worker would accept it).
Well, here it is, use with care.
Cancels the scheduled re_dispatch, if any.
An example or 'retrying participant' :
class RetryParticipant
include Ruote::LocalParticipant
def initialize (opts)
@opts = opts
end
def consume (workitem)
begin
do_the_job
reply(workitem)
rescue
re_dispatch(workitem, :in => @opts['delay'] || '1s')
end
end
def cancel (fei, flavour)
unschedule_re_dispatch(fei)
end
end
Note how unschedule_re_dispatch is used in the cancel method. Warning, this example could loop forever...
# File lib/ruote/part/local_participant.rb, line 106 def unschedule_re_dispatch (fei) fexp = Ruote::Exp::FlowExpression.fetch( @context, Ruote::FlowExpressionId.extract_h(fei)) if s = fexp.h['re_dispatch_sched_id'] @context.storage.delete_schedule(s) end end
Generated with the Darkfish Rdoc Generator 2.