Parent

Included Modules

RuoteAMQP::ParticipantProxy

AMQP Participants

The RuoteAMQP::ParticipantProxy allows you to send workitems (serialized as JSON) or messages to any AMQP queues right from the process definition. When combined with the RuoteAMQP::Receiver you can easily leverage an extremely powerful local/remote participant combinations.

For local/remote participants The local part of the RuoteAMQP::ParticipantProxy relies on the presence of a RuoteAMQP::Receiver. Workitems are sent to the remote participant and the local part does not normally reply to the engine. Instead the engine will continue when a reply is received on the 'ruote_workitems' queue (see RuoteAMQP::Receiver).

Of course, the standard :forget => true format can be used even with remote particpants and :forget can even be set as a default in the options.

A simple way to create a remote participant to act upon workitems is to use the daemon-kit ruote responder.

Simple AMQP messages are treated as 'fire and forget' and the flow will continue when the local participant has queued the message for sending. (As there is no meaningful way to receive a workitem in reply).

Configuration

AMQP configuration is handled by directly manipulating the values of the AMQP.settings hash, as provided by the AMQP gem. No AMQP defaults are set by the participant.

Usage

Define the queue used by an AMQP participant :

engine.register_participant(
  :delete_user, RuoteAMQP::ParticipantProxy, 'queue' => 'user_manager')

Sending a workitem to the remote participant defined above:

Ruote.process_definition do
  sequence do
    delete_user
  end
end

Let the local participant reply to the engine without involving the receiver

Ruote.process_definition do
  sequence do
    delete_user :forget => true
  end
end

Setting up the participant in a slightly more 'raw' way:

engine.register_participant(
  :amqp, RuoteAMQP::ParticipantProxy )

Sending a workitem to a specific queue:

Ruote.process_definition do
  sequence do
    amqp :queue => 'test', 'command' => '/run/regression_test'
  end
end

Setup a 'fire and forget' participant that always replies to the engine:

engine.register_participant(
  :jfdi, RuoteAMQP::ParticipantProxy, 'forget' => true )

Sending a message example to a specific queue (both steps are equivalent):

Ruote.process_definition do
  sequence do
    amqp :queue => 'test', :message => 'foo'
    amqp :queue => 'test', :message => 'foo', :forget => true
  end
end

AMQP notes

The participant currently only makes use of direct exchanges. Possible future improvements might see use for topic and fanout exchanges as well.

The direct exchanges are always marked as durable by the participant, and messages are marked as persistent by default (see #RuoteAMQP)

Public Class Methods

new(options) click to toggle source

The following parameters are used in the process definition.

An options hash with the same keys to provide defaults is accepted at registration time (see above).

  • :queue => (string) The AMQP queue used by the remote participant. nil by default.

  • :forget => (bool) Whether the flow should block until the remote participant replies. false by default

# File lib/ruote-amqp/participant.rb, line 121
def initialize(options)

  RuoteAMQP.start!

  @options = {
    'queue' => nil,
    'forget' => false,
  }.merge(options.inject({}) { |h, (k, v)|
    h[k.to_s] = v; h
  })
    #
    # the inject is here to make sure that all options have String keys
end

Public Instance Methods

cancel(fei, flavour) click to toggle source
# File lib/ruote-amqp/participant.rb, line 176
def cancel(fei, flavour)
  #
  # TODO : sending a cancel item is not a bad idea, especially if the
  #        job done over the amqp fence lasts...
  #
end
consume(workitem) click to toggle source

Process the workitem at hand. By default the workitem will be published to the direct exchange specified in the queue workitem parameter. You can specify a message workitem parameter to have that sent instead of the workitem.

# File lib/ruote-amqp/participant.rb, line 140
def consume(workitem)

  target_queue = determine_queue(workitem)

  raise 'no queue specified (outbound delivery)' unless target_queue

  q = MQ.queue( target_queue, :durable => true )
  @forget = determine_forget( workitem )

  opts = {
    :persistent => RuoteAMQP.use_persistent_messages?,
    :content_type => 'application/json',
            :reply_to => 'ruote_workitems',
      }

  if message = workitem.fields['message'] || workitem.params['message']

    @forget = true # sending a message implies 'forget' => true

    q.publish(message, opts)

  else

    q.publish(encode_workitem(workitem), opts)
  end

  reply_to_engine( workitem ) if @forget
end
do_not_thread() click to toggle source

The current AMQP (0.6.7) has 1 queue per thread. If you let the default "one thread per participant consume call" kick in, you'll end up with 1 queue per consume call (and...)

So, by returning true here, we force the queue to be always the same.

Many thanks to github.com/weifeng365 for reporting this issue and suggesting the fix.

TODO : should we have something to close queues when the engine / worker

shuts down ?
or is it already covered in the #stop ?
# File lib/ruote-amqp/participant.rb, line 196
def do_not_thread

  true
end
stop() click to toggle source

(Stops the underlying queue subscription)

# File lib/ruote-amqp/participant.rb, line 171
def stop

  RuoteAMQP.stop!
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.