Module workerpoolmanager
[hide private]
[frames] | no frames]

Source Code for Module workerpoolmanager

 1  # -*- coding: utf-8 -*- 
 2   
 3  ############################################################################ 
 4  # workerpoolmanager.py 
 5  # 
 6  # Manages worker process pool. 
 7  # 
 8  # (C) 2008 Likya Software Ltd. 
 9  ############################################################################ 
10   
11  from string import Template 
12   
13  from twisted.internet import defer 
14  from mcasynqueue import mcasynprocessworker 
15   
16  from constants import Constants 
17 18 -class WorkerPoolManager(object):
19
20 - def __init__(self, appServer):
21 ''' 22 Creates the worker pool manager object. 23 24 @param appServer: Owner Application Server of this worker pool. 25 ''' 26 # Our application server 27 self._appServer = appServer 28 # We run a single job on all workers, the following is the ID of that job. 29 self._jobID = None 30 self._numberOfWorkers = None
31 32 @defer.inlineCallbacks
33 - def start(self, numberOfWorkers):
34 ''' 35 Starts the actual worker pool. 36 37 @type numberOfWorkers: integer 38 @param numberOfWorkers: Indicates the default size of the worker pool. 39 @return: Deferred that will fire up when the pool is ready. 40 ''' 41 self._childManager = mcasynprocessworker.ChildManager() 42 self._numberOfWorkers = numberOfWorkers 43 44 processIDs = yield self._childManager.startup(numberOfWorkers) 45 substitutes = { 46 'APPSRV_PATH': self._appServer.directory(), 47 } 48 remoteCode = Template(open('jobtemplate.py').read()).substitute(substitutes) 49 self._jobID = yield self._childManager.new(remoteCode)
50 51 @defer.inlineCallbacks
52 - def run(self, *args):
53 ''' 54 Runs a job specified by arguments. 55 56 @type callName: C{string} 57 @param callName: Entry point (function name) of job (which is defined in 58 jobtemplate.py) 59 @param args: Remote call parameters 60 @param kw: Remote call keyword parameters 61 @return: Job result. 62 @return: Deferred result 63 ''' 64 result = yield self._childManager.run(self._jobID, *args) 65 defer.returnValue(result)
66
67 - def activeProcessCount(self):
68 ''' 69 Returns the number of active processes. 70 71 @rtype: integer 72 @return: Number of active (busy) processes 73 ''' 74 return len(self._childManager.callsPending[self._jobID])
75
76 - def availableProcesses(self):
77 ''' 78 Returns the number of available idle processes. 79 80 @rtype: integer 81 @return: Number of available idle processes. 82 ''' 83 return self._numberOfWorkers - len(self._childManager.callsPending[self._jobID])
84