Listing 4. Multiserver Server Side (Ohio) #!/usr/bin/python import subprocess import os from qpid.util import connect, ssl from qpid.connection import Connection, sslwrap from qpid.datatypes import Message, RangedSet, uuid4 from qpid.queue import Empty from qpid.spec import load from qpid.queue import Empty from qpid.session import SessionException # ProcessRequest: this is what actually does the work. def processRequest(requestMessage): print "Predicting the weather for Ohio" myPid = os.getpid() ret_value = "From Server PID " \ + str(myPid) + ": Ohio is sunny and 70!" return ret_value # First, load the correct specification file. locSpec = load('/usr/local/share/qpid/specs/amqp.0-10.xml') # Now, connect to the server. socket = connect("localhost", 5672) connection = Connection(sock=socket, spec=locSpec, username="guest", password="guest") connection.start() session = connection.session(str(uuid4())) # Declare the listening server queue and connect to server queue. # Create server queue if it does not exist. myPid = os.getpid() listenTopic = "#.ohio" serverQueueName = "serverListenQueueOhio" localQueueName = "localQueue_" + str(myPid) try: session.message_subscribe(queue=serverQueueName, destination=localQueueName) localQueue = session.incoming(localQueueName) localQueue.start() print "Successfully attached to existing server queue." except SessionException, e: print "Could not find server queue, so I am creating it." session = connection.session(name=str(uuid4()), timeout=0) session.queue_declare(queue=serverQueueName, exclusive=False) session.exchange_bind(exchange="amq.topic", queue=serverQueueName, binding_key=listenTopic) session.message_subscribe(queue=serverQueueName, destination=localQueueName) localQueue = session.incoming(localQueueName) localQueue.start() except Exception, e: print "Something broke unexpectedly." os.exit() # Now, start a message loop. while True: try: requestObj = localQueue.get(timeout=60) session.message_accept(RangedSet(requestObj.id)) requestStr = requestObj.body print "Received message." requestProperties = requestObj.get("message_properties") replyTo = requestProperties.reply_to if replyTo == None: raise Exception("This message is missing the " + "'reply_to' property, " + "which is required") responseMessage = processRequest(requestStr) props = session.delivery_properties( routing_key=replyTo["routing_key"]) print "Responding to request." session.message_transfer(destination=replyTo["exchange"], message=Message(props, responseMessage)) except Empty: continue