1
2
3
4
5
6
7
8
9
10
11 import os, sys, mmap, cPickle
12 import sha, tempfile
13
14 import M2Crypto as m2
15
16 from twisted.internet import defer
17 from sqlalchemy import create_engine
18 from sqlalchemy.orm import sessionmaker
19
20 from constants import Constants
21 from util import Util
22 from mobilecodemanager import MobileCodeManager
23 from mcremote import MCRemote
24 from clientrequest import ClientRequest
25
26
27 basePath = os.path.normpath(os.path.join(os.path.dirname(sys.argv[0]), '..'))
28
29
30
31
32
33
34
35 -class MOCSender(MCRemote):
36 '''
37 Class for sending mobile code to the clients
38 '''
39 @defer.inlineCallbacks
41 '''
42 Main enterance method, gets the code for the given C{codePath}
43
44 @param codePath: String representing the code path to be transferred
45 @type codePath: C{string}
46 @return: A tuple containing packaged remote code and its signature.
47 @rtype: tuple
48 '''
49 packagedCode = yield self._worker.getMobileCode(self.domain(), codePath)
50
51
52 packageHash = sha.new()
53 packageHash.update(packagedCode)
54 packageHash = packageHash.digest()
55
56 signature = m2.RSA.load_key_string(open(os.path.join(basePath, Constants.CERTIFICATE), 'rb').read()).sign(packageHash)
57
58 defer.returnValue((packagedCode, signature))
59
60 @defer.inlineCallbacks
62 '''
63 Main enterance method, gets the code for the given C{codePath}
64
65 @type codePath: C{string}
66 @param codePath: String representing the code path to be transferred
67 '''
68 result = yield self._worker.getMobileCodeMD5(self.domain(), codePath)
69 defer.returnValue(result)
70
72
73 - def __init__(self, runID, workerServices):
74 '''
75 Initializes a worker object
76
77 @param workerServices: Services available to this worker
78 '''
79
80 self._id = runID
81
82 self._workerServices = workerServices
83
84 self._directory = basePath
85
86 self._mocManager = MobileCodeManager(self)
87
88 self._dbSessions = {}
89
90 self._mmapFileName = Constants.MMAP_RESULT_FILE + str(os.getpid())
91 open(self._mmapFileName, 'wb+').write(' ')
92
93 self._mmapFile = open(self._mmapFileName, 'rb+')
94 self._mmap = mmap.mmap(self._mmapFile.fileno(), 0)
95
96 self._mmapParamsFileName = Constants.MMAP_REQ_FILE + str(os.getpid())
97
99 '''
100 Returns the DB session according to the given domain
101
102 @type domain: C{Domain}
103 @param domain: DB session for L{Domain}
104 '''
105 domainName = domain.name()
106
107 if self._dbSessions.has_key(domainName):
108 retval = self._dbSessions[domainName]
109 else:
110 engine = create_engine(domain.db())
111 Session = sessionmaker(bind=engine, autocommit=False, autoflush=True, expire_on_commit=False)
112
113 retval = self._dbSessions[domainName] = Session()
114
115 return retval
116
118 '''
119 Returns the mobile code as a package to be returned
120 to the client.
121
122 @type domain: C{Domain}
123 @type domain: L{Domain} object representing the current domain
124 @type codePath: string
125 @param codePath: Path to be packaged (i.e. 'bas/controllers')
126 '''
127 return self._mocManager.pathPackage(domain.dist(), codePath)
128
130 '''
131 Returns the mobile code hash to be returned to the client.
132
133 @type domain: C{Domain}
134 @type domain: L{Domain} object representing the current domain
135 @type codePath: string
136 @param codePath: Path to be packaged (i.e. 'bas/controllers')
137 '''
138 return self._mocManager.pathMD5(domain.dist(), codePath)
139
141 '''
142 Returns the current working directory of the application server.
143
144 @rtype: C{string}
145 @return: Current working directory.
146 '''
147 return self._directory
148
150 '''
151 Return the worker services.
152
153 @return: Worker services for this worker
154 '''
155 return self._workerServices
156
157 @defer.inlineCallbacks
159 '''
160 @return: A deferred that will fire with a ClientRequest object.
161 '''
162 yield self.workerServices().callRemote('sendClientRequest', runID, self._mmapParamsFileName)
163
164 mappedFile = open(self._mmapParamsFileName, 'rb+')
165
166 mp = mmap.mmap(mappedFile.fileno(), 0)
167
168
169 dataSize = int(mp[0:8], 16)
170
171 result = mp[8:dataSize + 8]
172
173 cr = ClientRequest.loads(result)
174 defer.returnValue(cr)
175
177 '''
178 Informs the server of the request ID of the following client message.
179
180 @param reqID: Unique id for the message send session.
181 @type reqID: str
182 '''
183 return self.workerServices().callRemote('initSendMsg', reqID)
184
186 '''
187 @return: A deferred that will fire when the result is delivered to server.
188
189 @param runID: Unique id for the message send session.
190 @type runID: str
191 '''
192 result = Util.dumps(result)
193 resultSize = len(result)
194
195
196 if self._mmap.size() < resultSize + 8:
197
198 if sys.platform == 'darwin':
199
200 mappedFile = open(self._mmapFileName, 'wb+')
201 mappedFile.seek(resultSize + 8)
202 mappedFile.write(' ')
203 mappedFile.close()
204 self._mmapFile = open(self._mmapFileName, 'rb+')
205 self._mmap = mmap.mmap(self._mmapFile.fileno(), 0)
206
207 else:
208 self._mmap.resize(resultSize + 8)
209
210
211 self._mmap[0:8] = "%08x" % resultSize
212 self._mmap[8:resultSize + 8] = result
213
214 return self.workerServices().callRemote('resultReady', runID, self._mmapFileName)
215
217 '''
218 @return: A deferred that will errback when the failure is delivered to server.
219 '''
220 failure = Util.dumps(failure)
221 failureSize = len(failure)
222
223
224 if self._mmap.size() < failureSize + 8:
225
226 if sys.platform == 'darwin':
227
228 mappedFile = open(self._mmapFileName, 'wb+')
229 mappedFile.seek(failureSize + 8)
230 mappedFile.write(' ')
231 mappedFile.close()
232 self._mmapFile = open(self._mmapFileName, 'rb+')
233 self._mmap = mmap.mmap(self._mmapFile.fileno(), 0)
234
235 else:
236 self._mmap.resize(failureSize + 8)
237
238
239 self._mmap[0:8] = "%08x" % failureSize
240 self._mmap[8:failureSize + 8] = failure
241
242 return self.workerServices().callRemote('callFailed', runID, self._mmapFileName)
243
244 @defer.inlineCallbacks
246 '''
247 Runs the actual task.
248 '''
249 clientRequest = yield self.getClientRequest(self._id)
250 taskPath = clientRequest.taskPath()
251
252
253 if ':' in taskPath:
254
255 task, methodName = taskPath.split(':')
256 else:
257 task = taskPath
258
259 methodName = 'run'
260
261
262 os.chdir(os.path.join(self.directory(), Constants.WORK_DIRECTORY, clientRequest.domain().dist()))
263
264
265
266
267 if basePath not in sys.path:
268 sys.path.insert(0, basePath)
269
270 if "." not in sys.path:
271 sys.path.insert(0, ".")
272
273
274
275 taskParts = task.split('.')
276 modulePrefix = '.'.join(taskParts[:-2])
277 moduleName = taskParts[-2]
278 className = taskParts[-1]
279
280
281 if modulePrefix:
282
283
284 exec('from %s import %s' % (modulePrefix, moduleName))
285 else:
286
287
288 exec('import %s' % moduleName)
289
290
291
292
293 if moduleName != 'worker':
294 exec('reload(%s)' % moduleName)
295
296
297 dbSession = None
298 if clientRequest.domain().db():
299 dbSession = self.DBSession(clientRequest.domain())
300
301
302 exec('taskObj = %s.%s(self, clientRequest, dbSession)' % (moduleName, className))
303
304
305
306
307
308
309
310 try:
311 result = yield getattr(taskObj, 'remote_%s' % methodName)(
312 *clientRequest.args(),
313 **clientRequest.kw()
314 )
315
316 if dbSession:
317 dbSession.commit()
318
319
320 self.resultReady(self._id, result)
321
322 except Exception, e:
323
324 taskObj.log('*** Error in task: %s -=> %s' % (e.__class__, e.args))
325 dbSession.rollback()
326 self.callFailed(self._id, e)
327
328 self._mmap.close()
329