How to - AMQP queue management

AMQP queue management in Orchestrator

 

Overview

 

Orchestrator comes with a plugin for AMQP (Advanced Message Queuing Protocol) to get interfaced with AMQP compatible servers like RabbitMQ or Apache Qpid.

 

In a request handling workflow, a request for insert is made to the AMQP queue serving requests. The message to insert (XML content for example) is in input of this request.

 

A response handling workflow is reading through the AMQP queue serving responses. When a response is obtained, it is mapped to an existing request via a synchronization mechanism. The response message (XML format for example) is passed back to the workflow handling the requests.

 

Synchronization

 

The synchronization mechanism relies on registering a unique id, called correlation id  into an internal table (shared_states) of the MySQL database. 

This correlation id is used to map a response with a request.

The correlation id must be generated earlier in the workflow (e.g. collected from a video metadata information) and must be unique.

 

The registering is done in the request handling workflow (“register request” step below). In addition to the correlation id, a status with value “blocked” is added in the created table entry. The correlation id XML is obtained from the request message within the register request step. The currently executing workorder id is also added in the created entry.

 

 amqp_request.JPG

 

Register request code:

my_xml_response = inputs['xml_request']

xml_structure   = XmlSimple.xml_in my_xml_request { 'ForceArray' => false }

correlation_id = xml_structure['correlationId']

#add an entry in DB for the correlation id (in path), the status = blocked (in name) and the workorder id (in entry)

id = SharedState.persist_to_db({:workorder_id=>@work_order_id}, "/#{correlation_id}/blocked")

 

When a response is obtained in the response handling workflow, the entry in the table is looked up based on the correlation id and status “blocked” (“register response” step). If found, but with a referenced workorder not in a stable state (e.g. being canceled or failed or already completed), a failure is returned.

Otherwise the entry is updated with the response payload and its status is set to “released”.

The response returned by the AMQP plugin must contain the correlation id matching the one passed in the request.

 

 amqp_response.JPG

 

Register response code:

my_xml_response = inputs['xml_response']

xml_structure   = XmlSimple.xml_in my_xml_response { 'ForceArray' => false }

correlation_id = xml_structure['correlationId']

 

#find the entry in DB for this correlation id

states = SharedState.find(:all, :conditions=>["path = '/#{correlation_id}' AND name = 'blocked'"])

 

if states.size == 0

  @status_details = "No entry registered for correlation id #{correlation_id}"

  @status = STATUS_FAILED

elsif states.size == 1

  workorder_id = states[0].data[:workorder_id]

  workOrder=WorkOrder.find(:first, :conditions=>["id =?",workorder_id])

  if ((workOrder.nil?) || workOrder.isStable?) #error as workorder is not on going (cancelled, complete, failed, etc)

    #remove DB entry

    states[0].destroy

    @status_details = "Workorder #{workorder_id} is not on-going"

    @status = STATUS_FAILED

  else

    #update entry status

    row_id = states[0].id

    new_entry = {:workorder_id=>workorder_id,:xml_response=>my_xml_response}   # add the xml response in the entry field

    states[0].data = new_entry

    states[0].name = 'released'

    states[0].save

    #Database.execute("update shared_states set name = 'released' where id = #{row_id}")

    @status_details = "Response received for correlation id #{correlation_id} and workorder id #{workorder_id}"

    @status = STATUS_COMPLETE

  end

else

  #remove all entries

  states.each do |state|

    state.destroy

  end

  @status_details = "More than 1 entry in DB for #{correlation_id}"

  @status = STATUS_FAILED

End

 

The request handling workflow is waiting on a table entry matching the correlation id and status “released” (custom trigger “wait for response step”). When found, but with a workorder id different than the currently executing workorder id, a failure is returned. Otherwise the response payload is obtained from the table entry and put in output of the step. The found entry is deleted from the table.

 

 amqp_request.JPG

 

Wait for response code (within check status section):

correlation_id = inputs['correlation_id']

#find the entry in DB for this correlation id

states = SharedState.find(:all, :conditions=>["path = '/#{correlation_id}' AND name = 'released'"])

 

if states.size == 0

  @status_details = "No response yet for correlation id #{correlation_id}"

  @status = STATUS_INPROGRESS

elsif states.size == 1

  workorder_id_db = states[0].data[:workorder_id]

  workorder_id_current = @work_order_id

  if (workorder_id_db != workorder_id_current) #error as workorder id from DB does not match current workorder id

    @status_details = "Workorder id in DB #{workorder_id_db} does not match current workorder id #{workorder_id_current}"

    @status = STATUS_FAILED

  else

    #get xml response out of the entry field

    outputs['xml_response'] = states[0].data[:xml_response]

    #remove DB entry

    states[0].destroy

    @status_details = "Response received for correlation id #{correlation_id} and workorder id #{workorder_id_db}"

    @status = STATUS_COMPLETE

  end

else

  #remove all entries

  states.each do |state|

    state.destroy

  end

  @status_details = "More than 1 entry in DB for #{correlation_id}"

  @status = STATUS_FAILED

end

Attachments

0 Comments

Please sign in to leave a comment.
Powered by Zendesk