Object
The class where despatchement of workitems towards [real] participant is done.
Can be extended/replaced for better handling of Thread (why not something like a thread pool or no threads at all).
# File lib/ruote/part/dispatch_pool.rb, line 74 def dispatch (msg) participant = @context.plist.lookup( msg['participant'] || msg['participant_name'], msg['workitem']) if participant.respond_to?(:do_not_thread) && participant.do_not_thread do_dispatch(participant, msg) else do_threaded_dispatch(participant, msg) end end
# File lib/ruote/part/dispatch_pool.rb, line 56 def dispatch_cancel (msg) flavour = msg['flavour'] participant = @context.plist.instantiate(msg['participant']) begin participant.cancel(Ruote::FlowExpressionId.new(msg['fei']), flavour) rescue Exception => e raise(e) if flavour != 'kill' end @context.storage.put_msg( 'reply', 'fei' => msg['fei'], 'workitem' => msg['workitem']) end
# File lib/ruote/part/dispatch_pool.rb, line 86 def do_dispatch (participant, msg) workitem = Ruote::Workitem.new(msg['workitem']) workitem.fields['dispatched_at'] = Ruote.now_to_utc_s participant.consume(workitem) @context.storage.put_msg('dispatched', 'fei' => msg['fei']) # once the consume is done, asynchronously flag the # participant expression as 'dispatched' end
# File lib/ruote/part/dispatch_pool.rb, line 99 def do_threaded_dispatch (participant, msg) # Maybe at some point a limit on the number of dispatch threads # would be OK. # Or maybe it's the job of an extension / subclass Thread.new do begin do_dispatch(participant, msg) rescue Exception => exception @context.error_handler.msg_handle(msg, exception) end end end
Generated with the Darkfish Rdoc Generator 2.