#!/usr/bin/python -tt # Test thread safety of qmf # # This tests keeping a single qmf session (to avoid the high cost of creating # connections) and using it to make many requests in several threads. # # We get a baseline in unthreaded operation, then compare the number of # returned values in each thread against the baseline. If the numbers differ # we definitely have a problem (note, this doesn't catch all possible errors, # but it's enough to highlight the errors that we're seeing now. We should # also check that the return values are the same type in order to catch all # errors.) # # We expect that the script will print: # Unloaded nodes: [STRING OF NODES] # Unloaded pools: [STRING OF POOLS] # run the threads and then exit. # # However, we see that when the threads are run, the lengths of the nodes and # pools returned don't match the baseline, so we have entries printed for # Loaded nodes: [STRING OF NODES THAT DON'T MATCH BASELINE] # Loaded pools: [STRING OF POOLS THAT DON'T MATCH BASELINE] from qmf.console import Session import threading amqp_session = None class ActionThread(threading.Thread): def __init__(self, nodes, pools): self.nodes = nodes self.pools = pools super(ActionThread, self).__init__() def run(self): # retrieve nodes and pools We want to get requests to overlap and end # up with nodes in the pools variable and vice versa nodes = amqp_session.getObjects(_class='node', _package='com.redhat.libvirt.node') pools = amqp_session.getObjects(_class='pool', _package='com.redhat.libvirt.pool') # Simplistically, check the number of nodes/pools against our baseline # and if they don't match, print them out. if len(nodes) != len(self.nodes): print ' Loaded nodes: %s' % nodes if len(pools) != len(self.pools): print ' Loaded pools: %s' % pools def main(): # Retrieve a baseline in unthreaded code nodes = amqp_session.getObjects(_class='node', _package='com.redhat.libvirt.node') pools = amqp_session.getObjects(_class='pool', _package='com.redhat.libvirt.pool') print 'Unloaded nodes: %s' % nodes print 'Unloaded pools: %s' % pools # Start 5 threads and try to connect to the amqp server with them for x in xrange(5): ActionThread(nodes, pools).start() def init(): # Create a global amqp session for use everywhere global amqp_session amqp_session = Session(manageConnections=False) broker = amqp_session.addBroker('amqp://localhost', timeout=10) if __name__ == '__main__': init() main()