Included Modules

Class/Module Index [+]

Quicksearch

Ruote::LocalParticipant

Provides methods for 'local' participants.

Assumes the class that includes this module has a context method that points to the worker or engine ruote context.

It's "local" because it has access to the ruote storage.

Attributes

context[RW]

the reply_to_engine method is there

Public Instance Methods

re_dispatch(workitem, opts={}) click to toggle source

Use this method to re_dispatch the workitem.

It takes two options :in and :at for "later re_dispatch".

Look at the unschedule_re_dispatch method for an example of participant implementation that uses re_dispatch.

Without one of those options, the method is a "reject".

# File lib/ruote/part/local_participant.rb, line 54
def re_dispatch (workitem, opts={})

  msg = {
    'action' => 'dispatch',
    'fei' => workitem.h.fei,
    'workitem' => workitem.h,
    'participant_name' => workitem.participant_name,
    'rejected' => true
  }

  if t = opts[:in] || opts[:at]

    sched_id = @context.storage.put_schedule('at', workitem.h.fei, t, msg)

    fexp = fetch_flow_expression(workitem)
    fexp.h['re_dispatch_sched_id'] = sched_id
    fexp.try_persist

  else

    @context.storage.put_msg('dispatch', msg)
  end
end
Also aliased as: reject
reject(workitem, opts={}) click to toggle source

WARNING : this method is only for 'stateless' participants, ie participants that are registered in the engine by passing their class and a set of options, like in

engine.register_participant 'alpha', MyParticipant, 'info' => 'none'

This reject method replaces the workitem in the [internal] message queue of the ruote engine (since it's a local participant, it has access to the storage and it's thus easy). The idea is that another worker will pick up the workitem and do the participant dispatching.

This is an advanced technique. It was requested by people who want to have multiple workers and have only certain worker/participants do the handling. Using reject is not the best method, it's probably better to implement this by re-opening the Ruote::Worker class and changing the cannot_handle(msg) method.

reject could be useful anyway, not sure now, but one could imagine scenarii where some participants reject workitems temporarily (while the same participant on another worker would accept it).

Well, here it is, use with care.

Alias for: re_dispatch
unschedule_re_dispatch(fei) click to toggle source

Cancels the scheduled re_dispatch, if any.

An example or 'retrying participant' :

class RetryParticipant
  include Ruote::LocalParticipant

  def initialize (opts)
    @opts = opts
  end

  def consume (workitem)
    begin
      do_the_job
      reply(workitem)
    rescue
      re_dispatch(workitem, :in => @opts['delay'] || '1s')
    end
  end

  def cancel (fei, flavour)
    unschedule_re_dispatch(fei)
  end
end

Note how unschedule_re_dispatch is used in the cancel method. Warning, this example could loop forever...

# File lib/ruote/part/local_participant.rb, line 106
def unschedule_re_dispatch (fei)

  fexp = Ruote::Exp::FlowExpression.fetch(
    @context, Ruote::FlowExpressionId.extract_h(fei))

  if s = fexp.h['re_dispatch_sched_id']
    @context.storage.delete_schedule(s)
  end
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.