Class/Module Index [+]

Quicksearch

Ruote::Exp::ParticipantExpression

The 'participant' expression is very special. It sits on the fence between the engine and the external world.

The participant expression is used to pass workitems to participants from the engine. Those participants are bound at start time (usually) in the engine via its register_participant method.

Here's an example of two concurrent participant expressions in use :

concurrence do
  participant :ref => 'alice'
  participant :ref => 'bob'
end

Upon encountering the two expressions, the engine will lookup their name in the participant map and hand the workitems to the participant instances registered for those names.

attributes passed as arguments

All the attributes passed to a participant will be fed to the outgoing workitem under a new 'params' field.

Thus, with

participant :ref => 'alice', :task => 'maw the lawn', :timeout => '2d'

Alice will receive a workitem with a field params set to

{ :ref => 'alice', :task => 'maw the lawn', :timeout => '2d' }

The fields named 'params' will be deleted before the workitems resumes in the flow (with the engine replying to the parent expression of this participant expression).

simplified participant notation

This process definition is equivalent to the one above. Less to write.

concurrence do
  participant 'alice'
  bob
end

Please note that 'bob' alone could stand for the participant 'bob' or the subprocess named 'bob'. Subprocesses do take precedence over participants (if there is a subprocess named 'bob' and a participant named 'bob'.

participant defined timeout

Usually, timeouts are given for an expression in the process definition.

participant 'alice', :timeout => '2d'

where alice as two days to complete her task (send back the workitem).

But it's OK for participant classes registered in the engine to provide their own timeout value. The participant instance simply has to reply to the timeout method and provide a meaningful timeout value.

Note however, that the process definition timeout (if any) will take precedence over the participant specified one.

asynchronous

The expression will make sure to dispatch to the participant in an asynchronous way. This means that the dispatch will occur in a dedicated thread.

Since the dispatching to the participant could take a long time and block the engine for too long, this 'do thread' policy is used by default.

If the participant itself replies to the method do_not_thread and replies positively to it, a new thread (or a next_tick) won't get used. This is practical for tiny participants that don't do IO and reply immediately (after a few operations). By default, BlockParticipant instances do not thread.

Public Instance Methods

apply() click to toggle source
# File lib/ruote/exp/fe_participant.rb, line 128
def apply

  #
  # determine participant

  h.participant_name = (attribute(:ref) || attribute_text).to_s

  raise ArgumentError.new(
    "no participant name specified"
  ) if h.participant_name == ''

  participant_info =
    h.participant ||
    @context.plist.lookup_info(h.participant_name, h.applied_workitem)

  unless participant_info.respond_to?(:consume)
    h.participant = participant_info
  end

  raise(ArgumentError.new(
    "no participant named #{h.participant_name.inspect}")
  ) if participant_info.nil?

  #
  # participant found, consider timeout

  schedule_timeout(participant_info)

  #
  # dispatch to participant

  h.applied_workitem['participant_name'] = h.participant_name
  h.applied_workitem['fields']['params'] = compile_atts

  persist_or_raise

  @context.storage.put_msg(
    'dispatch',
    'fei' => h.fei,
    'participant_name' => h.participant_name,
    'participant' => h.participant,
    'workitem' => h.applied_workitem,
    'for_engine_worker?' => (participant_info.class != Array))
end
cancel(flavour) click to toggle source
# File lib/ruote/exp/fe_participant.rb, line 173
def cancel (flavour)

  return reply_to_parent(h.applied_workitem) unless h.participant_name
    # no participant, reply immediately

  do_persist || return
    #
    # if do_persist returns false, it means we're operating on stale
    # data and cannot continue

  @context.storage.put_msg(
    'dispatch_cancel',
    'fei' => h.fei,
    'participant_name' => h.participant_name,
    'participant' => h.participant,
    'flavour' => flavour,
    'workitem' => h.applied_workitem)
end
reply(workitem) click to toggle source
# File lib/ruote/exp/fe_participant.rb, line 192
def reply (workitem)

  pinfo =
    h.participant ||
    @context.plist.lookup_info(h.participant_name, workitem)

  pa = @context.plist.instantiate(pinfo, :if_respond_to? => :on_reply)

  pa.on_reply(Ruote::Workitem.new(workitem)) if pa

  super(workitem)
end
reply_to_parent(workitem) click to toggle source
# File lib/ruote/exp/fe_participant.rb, line 205
def reply_to_parent (workitem)

  workitem['fields'].delete('params')
  workitem['fields'].delete('dispatched_at')
  super(workitem)
end

Protected Instance Methods

consider_timeout() click to toggle source

Overriden with an empty behaviour. The work is now done a bit later via the schedule_timeout method.

# File lib/ruote/exp/fe_participant.rb, line 231
def consider_timeout
end
do_dispatched(msg) click to toggle source

Once the dispatching work (done by the dispatch pool) is done, a 'dispatched' msg is sent, we have to flag the participant expression as 'dispatched' => true

See groups.google.com/group/openwferu-users/browse_thread/thread/ff29f26d6b5fd135 for the motivation.

# File lib/ruote/exp/fe_participant.rb, line 221
def do_dispatched (msg)

  h.dispatched = true
  do_persist
    # let's not care if it fails...
end
schedule_timeout(p_info) click to toggle source

Determines and schedules timeout if any.

Note that process definition timeout has priority over participant specified timeout.

# File lib/ruote/exp/fe_participant.rb, line 239
def schedule_timeout (p_info)

  timeout = attribute(:timeout)

  unless timeout
    pa = @context.plist.instantiate(p_info, :if_respond_to? => :timeout)
    timeout = pa.timeout if pa
  end

  do_schedule_timeout(timeout)
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.