1
2
3
4
5
6
7
8
9
10
11 from string import Template
12
13 from twisted.internet import defer
14 from mcasynqueue import mcasynprocessworker
15
16 from constants import Constants
19
21 '''
22 Creates the worker pool manager object.
23
24 @param appServer: Owner Application Server of this worker pool.
25 '''
26
27 self._appServer = appServer
28
29 self._jobID = None
30 self._numberOfWorkers = None
31
32 @defer.inlineCallbacks
33 - def start(self, numberOfWorkers):
34 '''
35 Starts the actual worker pool.
36
37 @type numberOfWorkers: integer
38 @param numberOfWorkers: Indicates the default size of the worker pool.
39 @return: Deferred that will fire up when the pool is ready.
40 '''
41 self._childManager = mcasynprocessworker.ChildManager()
42 self._numberOfWorkers = numberOfWorkers
43
44 processIDs = yield self._childManager.startup(numberOfWorkers)
45 substitutes = {
46 'APPSRV_PATH': self._appServer.directory(),
47 }
48 remoteCode = Template(open('jobtemplate.py').read()).substitute(substitutes)
49 self._jobID = yield self._childManager.new(remoteCode)
50
51 @defer.inlineCallbacks
52 - def run(self, *args):
53 '''
54 Runs a job specified by arguments.
55
56 @type callName: C{string}
57 @param callName: Entry point (function name) of job (which is defined in
58 jobtemplate.py)
59 @param args: Remote call parameters
60 @param kw: Remote call keyword parameters
61 @return: Job result.
62 @return: Deferred result
63 '''
64 result = yield self._childManager.run(self._jobID, *args)
65 defer.returnValue(result)
66
68 '''
69 Returns the number of active processes.
70
71 @rtype: integer
72 @return: Number of active (busy) processes
73 '''
74 return len(self._childManager.callsPending[self._jobID])
75
77 '''
78 Returns the number of available idle processes.
79
80 @rtype: integer
81 @return: Number of available idle processes.
82 '''
83 return self._numberOfWorkers - len(self._childManager.callsPending[self._jobID])
84