Module worker
[hide private]
[frames] | no frames]

Source Code for Module worker

  1  # -*- coding: utf-8 -*- 
  2   
  3  ############################################################################ 
  4  # worker.py 
  5  # 
  6  # Worker class for MOCOP. 
  7  # 
  8  # (C) 2008 Likya Software Ltd. 
  9  ############################################################################ 
 10   
 11  import os, sys, mmap, cPickle 
 12  import sha, tempfile 
 13   
 14  import M2Crypto as m2 
 15   
 16  from twisted.internet import defer 
 17  from sqlalchemy import create_engine 
 18  from sqlalchemy.orm import sessionmaker 
 19   
 20  from constants import Constants 
 21  from util import Util 
 22  from mobilecodemanager import MobileCodeManager 
 23  from mcremote import MCRemote 
 24  from clientrequest import ClientRequest 
 25  #from mchistory import MCVersionedListener 
 26   
 27  basePath = os.path.normpath(os.path.join(os.path.dirname(sys.argv[0]), '..')) 
28 29 #import jpype 30 #jpype.startJVM(jpype.getDefaultJVMPath(), 31 # "-Djava.class.path=%s" % os.path.join(basePath, 'dist', 'example'), 32 ## "-Djava.ext.dirs=%s%slib" % (root, os.sep)) 33 # ) 34 35 -class MOCSender(MCRemote):
36 ''' 37 Class for sending mobile code to the clients 38 ''' 39 @defer.inlineCallbacks
40 - def remote_run(self, codePath):
41 ''' 42 Main enterance method, gets the code for the given C{codePath} 43 44 @param codePath: String representing the code path to be transferred 45 @type codePath: C{string} 46 @return: A tuple containing packaged remote code and its signature. 47 @rtype: tuple 48 ''' 49 packagedCode = yield self._worker.getMobileCode(self.domain(), codePath) 50 51 # Compute packaged code hash 52 packageHash = sha.new() 53 packageHash.update(packagedCode) 54 packageHash = packageHash.digest() 55 56 signature = m2.RSA.load_key_string(open(os.path.join(basePath, Constants.CERTIFICATE), 'rb').read()).sign(packageHash) 57 58 defer.returnValue((packagedCode, signature))
59 60 @defer.inlineCallbacks
61 - def remote_computeMD5(self, codePath):
62 ''' 63 Main enterance method, gets the code for the given C{codePath} 64 65 @type codePath: C{string} 66 @param codePath: String representing the code path to be transferred 67 ''' 68 result = yield self._worker.getMobileCodeMD5(self.domain(), codePath) 69 defer.returnValue(result)
70
71 -class Worker(object):
72
73 - def __init__(self, runID, workerServices):
74 ''' 75 Initializes a worker object 76 77 @param workerServices: Services available to this worker 78 ''' 79 # Unique id of this run 80 self._id = runID 81 # Methods workers can use from app server 82 self._workerServices = workerServices 83 # Working directory 84 self._directory = basePath 85 # Set up the mobile code manager 86 self._mocManager = MobileCodeManager(self) 87 # Database sessions, keyed according to domain id 88 self._dbSessions = {} 89 # Memory mapped file definitions 90 self._mmapFileName = Constants.MMAP_RESULT_FILE + str(os.getpid()) 91 open(self._mmapFileName, 'wb+').write(' ') 92 93 self._mmapFile = open(self._mmapFileName, 'rb+') 94 self._mmap = mmap.mmap(self._mmapFile.fileno(), 0) 95 96 self._mmapParamsFileName = Constants.MMAP_REQ_FILE + str(os.getpid())
97
98 - def DBSession(self, domain):
99 ''' 100 Returns the DB session according to the given domain 101 102 @type domain: C{Domain} 103 @param domain: DB session for L{Domain} 104 ''' 105 domainName = domain.name() 106 107 if self._dbSessions.has_key(domainName): 108 retval = self._dbSessions[domainName] 109 else: 110 engine = create_engine(domain.db()) 111 Session = sessionmaker(bind=engine, autocommit=False, autoflush=True, expire_on_commit=False) 112 #Session = sessionmaker(bind=engine, autocommit=False, autoflush=True, expire_on_commit=False, extension=MCVersionedListener()) 113 retval = self._dbSessions[domainName] = Session() 114 115 return retval
116
117 - def getMobileCode(self, domain, codePath):
118 ''' 119 Returns the mobile code as a package to be returned 120 to the client. 121 122 @type domain: C{Domain} 123 @type domain: L{Domain} object representing the current domain 124 @type codePath: string 125 @param codePath: Path to be packaged (i.e. 'bas/controllers') 126 ''' 127 return self._mocManager.pathPackage(domain.dist(), codePath)
128
129 - def getMobileCodeMD5(self, domain, codePath):
130 ''' 131 Returns the mobile code hash to be returned to the client. 132 133 @type domain: C{Domain} 134 @type domain: L{Domain} object representing the current domain 135 @type codePath: string 136 @param codePath: Path to be packaged (i.e. 'bas/controllers') 137 ''' 138 return self._mocManager.pathMD5(domain.dist(), codePath)
139
140 - def directory(self):
141 ''' 142 Returns the current working directory of the application server. 143 144 @rtype: C{string} 145 @return: Current working directory. 146 ''' 147 return self._directory
148
149 - def workerServices(self):
150 ''' 151 Return the worker services. 152 153 @return: Worker services for this worker 154 ''' 155 return self._workerServices
156 157 @defer.inlineCallbacks
158 - def getClientRequest(self, runID):
159 ''' 160 @return: A deferred that will fire with a ClientRequest object. 161 ''' 162 yield self.workerServices().callRemote('sendClientRequest', runID, self._mmapParamsFileName) 163 164 mappedFile = open(self._mmapParamsFileName, 'rb+') 165 166 mp = mmap.mmap(mappedFile.fileno(), 0) 167 168 # First read the data size from memory mapped file 169 dataSize = int(mp[0:8], 16) 170 # Then read the actual data 171 result = mp[8:dataSize + 8] 172 173 cr = ClientRequest.loads(result) 174 defer.returnValue(cr)
175
176 - def initSendMsg(self, reqID):
177 ''' 178 Informs the server of the request ID of the following client message. 179 180 @param reqID: Unique id for the message send session. 181 @type reqID: str 182 ''' 183 return self.workerServices().callRemote('initSendMsg', reqID)
184
185 - def resultReady(self, runID, result):
186 ''' 187 @return: A deferred that will fire when the result is delivered to server. 188 189 @param runID: Unique id for the message send session. 190 @type runID: str 191 ''' 192 result = Util.dumps(result) 193 resultSize = len(result) 194 195 # Resize memory mapped file if necessary 196 if self._mmap.size() < resultSize + 8: # Extra 8 bytes are for hex repr. of length of data 197 198 if sys.platform == 'darwin': 199 # mmap.resize is not available on Mac OS X, we simulate it here 200 mappedFile = open(self._mmapFileName, 'wb+') 201 mappedFile.seek(resultSize + 8) 202 mappedFile.write(' ') 203 mappedFile.close() 204 self._mmapFile = open(self._mmapFileName, 'rb+') 205 self._mmap = mmap.mmap(self._mmapFile.fileno(), 0) 206 207 else: 208 self._mmap.resize(resultSize + 8) 209 210 # Write result in memory mapped file 211 self._mmap[0:8] = "%08x" % resultSize 212 self._mmap[8:resultSize + 8] = result 213 214 return self.workerServices().callRemote('resultReady', runID, self._mmapFileName)
215
216 - def callFailed(self, runID, failure):
217 ''' 218 @return: A deferred that will errback when the failure is delivered to server. 219 ''' 220 failure = Util.dumps(failure) 221 failureSize = len(failure) 222 223 # Resize memory mapped file if necessary 224 if self._mmap.size() < failureSize + 8: # Extra 8 bytes are for hex repr. of length of data 225 226 if sys.platform == 'darwin': 227 # mmap.resize is not available on Mac OS X, we simulate it here 228 mappedFile = open(self._mmapFileName, 'wb+') 229 mappedFile.seek(failureSize + 8) 230 mappedFile.write(' ') 231 mappedFile.close() 232 self._mmapFile = open(self._mmapFileName, 'rb+') 233 self._mmap = mmap.mmap(self._mmapFile.fileno(), 0) 234 235 else: 236 self._mmap.resize(failureSize + 8) 237 238 # Write failure in memory mapped file 239 self._mmap[0:8] = "%08x" % failureSize 240 self._mmap[8:failureSize + 8] = failure 241 242 return self.workerServices().callRemote('callFailed', runID, self._mmapFileName)
243 244 @defer.inlineCallbacks
245 - def run(self):
246 ''' 247 Runs the actual task. 248 ''' 249 clientRequest = yield self.getClientRequest(self._id) 250 taskPath = clientRequest.taskPath() 251 252 # Interpret the taskPath 253 if ':' in taskPath: 254 # Do we have a method name? 255 task, methodName = taskPath.split(':') 256 else: 257 task = taskPath 258 # Default method name to be called 259 methodName = 'run' 260 261 # Switch to new working directory according to domain.dist() 262 os.chdir(os.path.join(self.directory(), Constants.WORK_DIRECTORY, clientRequest.domain().dist())) 263 264 # Modify paths so that the application server related files can be imported 265 # We do this after changing the directory, this somehow disturbs windows 266 # python path resolution. 267 if basePath not in sys.path: 268 sys.path.insert(0, basePath) 269 270 if "." not in sys.path: 271 sys.path.insert(0, ".") 272 273 # Split task. Task is a string similar to "mcbase.controllers.serverinfo.test". 274 # This example represents the "test" class in module "mcbase.controllers.controllers" 275 taskParts = task.split('.') 276 modulePrefix = '.'.join(taskParts[:-2]) 277 moduleName = taskParts[-2] 278 className = taskParts[-1] 279 280 # Template form for reaching desired class, there are two alternatives: 281 if modulePrefix: 282 # We have a module prefix, i.e.: 283 # from mcbase.controller import test 284 exec('from %s import %s' % (modulePrefix, moduleName)) 285 else: 286 # We don't have a module prefix, i.e.: 287 # import test 288 exec('import %s' % moduleName) 289 290 # This is crucial, reloading allows us to use same module and class 291 # names for different domains. 292 # "reload worker" will give error because of reloading itself in windows. 293 if moduleName != 'worker': 294 exec('reload(%s)' % moduleName) 295 296 # Prepare the database session if the domain has db info 297 dbSession = None 298 if clientRequest.domain().db(): 299 dbSession = self.DBSession(clientRequest.domain()) 300 301 # Instantiate class, and run its 'methodName' method. 302 exec('taskObj = %s.%s(self, clientRequest, dbSession)' % (moduleName, className)) 303 304 # Call the desired method of desired object. First we should get 305 # the call parameters, we get parameters indirectly to cope with 306 # large objects. 307 # Result will not be directly returned, instead it will be returned 308 # via memory mapped files, just like the reverse of getting call 309 # parameters. 310 try: 311 result = yield getattr(taskObj, 'remote_%s' % methodName)( 312 *clientRequest.args(), 313 **clientRequest.kw() 314 ) 315 316 if dbSession: 317 dbSession.commit() 318 319 # Finally inform the application server about the result 320 self.resultReady(self._id, result) 321 322 except Exception, e: 323 # Something went wrong with the desired method. 324 taskObj.log('*** Error in task: %s -=> %s' % (e.__class__, e.args)) 325 dbSession.rollback() 326 self.callFailed(self._id, e) 327 328 self._mmap.close()
329