1
2
3
4
5
6
7
8
9
10
11 import os, sys, time, mmap
12 import cPickle
13 import xml.etree.ElementTree
14 import tempfile
15
16 from ConfigParser import SafeConfigParser
17 from string import Template
18
19 import zope.interface
20
21
22 from twisted.cred import portal, checkers, credentials, error
23 from twisted.internet import reactor, protocol, defer, threads, task, ssl
24 from twisted.spread import pb
25 from twisted.python import threadpool, log
26 from twisted.application import service, internet
27 from twisted.web import client, server, resource, static
28
29
30 from mcasynqueue import mcasynprocessworker
31
32
33 from translator import Translator
34 from constants import Constants
35 from workerpoolmanager import WorkerPoolManager
36 from servercodemanager import ServerCodeManager
37 from dbmanager import DBManager
38 from serverconf import ServerConf
39 from domain import DomainManager, Domain
40 from util import Util, MCPageCollector, MCResultsPager
41 from sysinfo import SysInfo
42 from mcjsonrpc import MCJsonRpc
43 from mcuser import MCUser
44 from clientrequest import ClientRequest
45
46 from services.masterserverservices import MasterServerServices
47 from services.slaveserverservices import SlaveServerServices
48 from services.clientservices import ClientServices
49 from services.workerservices import WorkerServices
50
51 from conn.serverrealm import ServerRealm
52 from conn.clientrealm import ClientRealm
53 from conn.clientchecker import ClientChecker
54 from conn.clientportal import ClientPortal
55 from conn.serversslfactory import ServerSSLFactory
56 from conn.clientsslfactory import ClientSSLFactory
57 from conn.slaveclientfactory import SlaveClientFactory
58
59 -class Server(object, service.Service):
60
62 '''
63 Initilizes the application server.
64 '''
65 self._configuration = ServerConf(self)
66 self._domainManager = DomainManager(self)
67 self._serverCodeManager = ServerCodeManager()
68 self._dbManager = DBManager()
69 self._sysInfo = SysInfo()
70
71
72 self._masterConnection = None
73
74 self._pingTask = task.LoopingCall(self.pingServer)
75
76 self._pingReqSent = False
77
78
79 self._masterServerPerspective = None
80
81
82 self._masterServerServices = None
83
84
85 self._workerServices = WorkerServices(self)
86
87
88 self._clientRealm = None
89
90
91 self._slaveServers = []
92
93
94 self._runResults = {}
95
96
97 self._clientRequestDumps = {}
98
126
127 @defer.inlineCallbacks
149
150 @defer.inlineCallbacks
152 '''
153 Creates the worker manager and worker processes
154
155 @return: Deferred that will fire up when the pool is ready.
156 '''
157 self._workerPoolManager = WorkerPoolManager(self)
158
159 processIDs = yield self._workerPoolManager.start(self.configuration().numberOfWorkers())
160
161 defer.returnValue(None)
162
164 '''
165 Starts listening for the clients on the application server.
166 '''
167 services = ClientServices(self)
168 checker = ClientChecker(self)
169 self._clientRealm = ClientRealm(self, services)
170 clientportal = ClientPortal(self._clientRealm, [checker])
171
172 clientFactory = pb.PBServerFactory(clientportal)
173 clientSSLFactory = ClientSSLFactory()
174
175
176 port = reactor.listenSSL(Constants.CLIENT_LISTENING_SSL_PORT, clientFactory, clientSSLFactory)
177
178 return port
179
186
188 '''
189 Starts listening for the clients on the application server.
190 '''
191
192 if self.isMaster():
193 checker = checkers.InMemoryUsernamePasswordDatabaseDontUse()
194
195
196 for server in self.configuration().serverList():
197 checker.addUser(server[0], server[1])
198
199 services = MasterServerServices(self)
200 realm = ServerRealm(self, services)
201 serverPortal = portal.Portal(realm, [checker])
202
203 serverFactory = pb.PBServerFactory(serverPortal)
204 serverSSLFactory = ServerSSLFactory()
205
206
207 port = reactor.listenSSL(self.configuration().masterListeningPort(), serverFactory, serverSSLFactory)
208
209 return port
210
211 @defer.inlineCallbacks
238
239 @defer.inlineCallbacks
267
269 '''
270 Starts the task that periodically pings the master and disconnects
271 upon failed reply.
272 '''
273 self._pingTask.start(Constants.PING_TIMEOUT)
274
276 '''
277 Stops the task that periodically pings the master and disconnects
278 upon failed reply.
279 '''
280 if self._pingTask.running:
281 self._pingTask.stop()
282 self._pingReqSent = False
283
285 '''
286 Returns True if the ping task is running, False otherwise.
287 '''
288 return self._pingTask.running
289
291 '''
292 Periodically calls a remote ping method on the server. If the request
293 sent in the previous call is unanswered, then increments the missing ping
294 count. If missing ping count is exceeded, force the disconnect of the
295 remote user.
296 '''
297 def cb_pingSent(ignore):
298 self._pingReqSent = False
299
300 if not self._pingReqSent:
301 try:
302 lastPingSent = time.time()
303 self._pingReqSent = True
304 self._masterServerPerspective.callRemote('ping').addCallback(cb_pingSent)
305 return
306 except Exception, e:
307 log.msg(tr('sys00012') % str(e))
308
309
310 log.msg(tr('sys00013'))
311 self.forceDisconnect()
312
314 '''
315 Forcibly disconnects the slave from master. Master may or may not be
316 aware of the disconnection.
317 '''
318 if self._slaveFactory:
319 if self.isPingTaskRunning():
320 self.stopPingTask()
321 self._slaveFactory.disconnect()
322 if self._slaveFactory._broker and self._slaveFactory._broker.transport:
323 self._slaveFactory._broker.transport.loseConnection()
324
325 @defer.inlineCallbacks
327 '''
328 Reassigns the task to any available server, this method only works on Master
329
330 @param clientRequestDump: Dumped C{ClientRequest} request object which
331 includes all the details for the call.
332 @type clientRequestDump: str
333 '''
334 while True:
335
336
337 if self._workerPoolManager.availableProcesses() > 0:
338
339 reqID = Util.uniq()
340
341 self._runResults[reqID] = defer.Deferred()
342
343 self._clientRequestDumps[reqID] = clientRequestDump
344 runnerDeferred = self._workerPoolManager.run('workerEntry', reqID, self._workerServices)
345 runnerDeferred.addErrback(self.runFailed, reqID, None)
346 log.msg(tr('sys00009') % ClientRequest.loads(clientRequestDump).taskPath())
347
348 result = yield self._runResults[reqID]
349 defer.returnValue(result)
350
351
352
353 for slave in self.slaveServers():
354 availableProcesses = yield slave.callRemote('availableProcesses')
355
356 if availableProcesses > 0:
357
358 reqID = Util.uniq()
359
360 requestCollector = yield slave.callRemote('sendRequest', reqID)
361 pager = MCResultsPager(requestCollector, clientRequestDump)
362
363 yield pager.wait()
364
365 collector = yield slave.callRemote('assignTask', MCPageCollector(), reqID)
366
367 defer.returnValue(''.join(collector.pages))
368
369 yield Util.deferredSleep(1.0)
370
371 @defer.inlineCallbacks
372 - def run(self, user, clientRequestDump):
373 '''
374 Runs a job on one of the process workers. Accepts any kind/type of arguments,
375 which will directly be passed to the worker.
376
377 @param user: The user that runs the task
378 @param clientRequestDump: Dumped C{ClientRequest} request object which
379 includes all the details for the call.
380 @type clientRequestDump: str
381 '''
382 reqID = Util.uniq()
383
384
385
386
387 clientRequest = ClientRequest.loads(clientRequestDump)
388 clientRequest.setDomain(self.domainManager().domain(clientRequest.domainName()))
389
390
391 if user:
392 clientRequest.setUserDict(user.toDict())
393 clientRequestDump = clientRequest.dumps()
394
395
396 if self._workerPoolManager.availableProcesses() > 0:
397
398 self._runResults[reqID] = defer.Deferred()
399
400 self._clientRequestDumps[reqID] = clientRequestDump
401 runnerDeferred = self._workerPoolManager.run('workerEntry', reqID, self._workerServices)
402 runnerDeferred.addErrback(self.runFailed, reqID, None)
403 log.msg(tr('sys00008') % clientRequest.taskPath())
404 else:
405 if self.isMaster():
406 self._runResults[reqID] = self.reassignTask(clientRequestDump)
407 else:
408
409 reqID = Util.uniq()
410
411 requestCollector = yield self.masterServerServices().callRemote('sendRequest', reqID)
412 pager = MCResultsPager(requestCollector, clientRequestDump)
413
414 yield pager.wait()
415
416 collector = yield self.masterServerServices().callRemote('reassignTask', MCPageCollector(), reqID)
417 result = ''.join(collector.pages)
418 self._runResults[reqID] = result
419
420 result = yield self._runResults[reqID]
421 defer.returnValue(result)
422
423
424
425
426
427
429 '''
430 Invoked by the worker service to signal parent server process
431 to send the client request for a specific run. This is done since
432 the parameters may also be large objects. Server then writes
433 the parameters to the given memory mapped file.
434
435 @param reqID: Client request id for the run (execute session)
436 @type reqID: str
437 @param mmapFileName: Memory mapped file name that the parameters
438 will be written to.
439 @type mmapFileName: str
440 '''
441
442 paramsPickle = self._clientRequestDumps[reqID]
443 dataSize = len(paramsPickle)
444
445
446 if not os.path.exists(mmapFileName):
447 mappedFile = open(mmapFileName, 'wb+')
448 mappedFile.write(' ')
449 mappedFile.close()
450
451
452 mappedFile = open(mmapFileName, 'rb+')
453 mp = mmap.mmap(mappedFile.fileno(), 0)
454
455
456 if mp.size() < dataSize + 8:
457
458 if sys.platform == 'darwin':
459
460 mappedFile = open(mmapFileName, 'wb+')
461 mappedFile.seek(dataSize + 8)
462 mappedFile.write(' ')
463 mappedFile.close()
464 mappedFile = open(mmapFileName, 'rb+')
465 mp = mmap.mmap(mappedFile.fileno(), 0)
466
467 else:
468 mp.resize(dataSize + 8)
469
470
471 mp[0:8] = "%08x" % dataSize
472 mp[8:dataSize + 8] = paramsPickle
473
474 del self._clientRequestDumps[reqID]
475
476 @defer.inlineCallbacks
478 '''
479 Called by the worker to inform server about a incoming message
480 to be transferred to other clients connected to a specific
481 domain.
482 '''
483 self._runResults[reqID] = defer.Deferred()
484 clientRequestDump = yield self._runResults[reqID]
485
486
487 self.sendMsg(None, Util.loads(clientRequestDump))
488
490 '''
491 Invoked by the worker service to signal parent server process
492 about the availability of the result of a specific call. Server
493 then reads the result from the given memory mapped file and
494 fires the related deferred with the result.
495
496 @param reqID: Unique id for the run (execute session)
497 @type reqID: str
498 @param mmapFileName: Memory mapped file name that the result was
499 written in
500 @type mmapFileName: str
501 '''
502 mappedFile = open(mmapFileName, 'rb+')
503 mp = mmap.mmap(mappedFile.fileno(), 0)
504
505 dataSize = int(mp[0:8], 16)
506
507 result = mp[8:dataSize + 8]
508 self._runResults[reqID].callback(result)
509 del self._runResults[reqID]
510
511 - def runFailed(self, failureOrNone, reqID, mmapFileName):
512 '''
513 Invoked by the worker service to signal parent server process
514 about the error of a specific call. Server then reads the error
515 from the given memory mapped file and fires the related deferred
516 with the result. It may also be attached to a callback chain to
517 catch normal errors.
518
519 @param failureOrNone: Used for chaining deferreds, normally used
520 by the server process itself. If this parameter is given
521 error given in this parameter will be raised and won't be
522 read from the memory mapped file.
523 @type failureOrNone: Failure or NoneType
524 @param reqID: Unique id for the run (execute session)
525 @type reqID: str
526 @param mmapFileName: Memory mapped file name that the result was
527 written in
528 @type mmapFileName: str
529 '''
530 if failureOrNone:
531 failure = failureOrNone
532 else:
533 mappedFile = open(mmapFileName, 'rb+')
534 mp = mmap.mmap(mappedFile.fileno(), 0)
535
536 failureSize = int(mp[0:8], 16)
537
538 failure = mp[8:failureSize + 8]
539
540 try:
541 failure = Util.loads(failure)
542 except TypeError, e:
543
544 failure = e
545
546 self._runResults[reqID].errback(failure)
547 del self._runResults[reqID]
548
549 @defer.inlineCallbacks
551 '''
552 Sends the given message to I{this} server's clients.
553 '''
554 for uniqID, user in self._clientRealm.getUsers().items():
555 cr = ClientRequest.loads(clientRequestDump)
556
557 if user.domainName() == cr.domainName():
558
559 reqID = Util.uniq()
560
561 requestCollector = yield user.client().callRemote('sendRequest', reqID)
562 pager = MCResultsPager(requestCollector, clientRequestDump)
563
564 yield pager.wait()
565
566 yield user.client().callRemote('messageArrived', reqID)
567
568 yield None
569
570 @defer.inlineCallbacks
571 - def sendMsg(self, user, clientRequestDump):
572 '''
573 Used by clients to send messages to other clients connected to the
574 same domain. Server resends this message to all connected slave
575 servers.
576
577 @param user: Avatar calling this method
578 @type user: C{MCUser}
579 @param clientRequestDump: Message to be passed, dumped
580 @type clientRequestDump: str
581 '''
582
583 if self.isMaster():
584 yield self.sendMsgToConnectedClients(clientRequestDump)
585
586 for slave in self.slaveServers():
587
588 reqID = Util.uniq()
589
590 requestCollector = yield slave.callRemote('sendRequest', reqID)
591 pager = MCResultsPager(requestCollector, clientRequestDump)
592
593 yield pager.wait()
594
595 yield slave.callRemote('sendMsgToConnectedClients', reqID)
596 else:
597
598
599
600
601 reqID = Util.uniq()
602
603 requestCollector = yield self.masterServerServices().callRemote('sendRequest', reqID)
604 pager = MCResultsPager(requestCollector, clientRequestDump)
605
606 yield pager.wait()
607
608 yield self.masterServerServices().callRemote('sendMsg', reqID)
609
610
611 @defer.inlineCallbacks
637
639 '''
640 Implements necessary logic when a user is about to login.
641
642 @param user: User that is about to be logged in
643 @type user: MCUser
644 '''
645 pass
646
648 '''
649 Implements necessary logic when a user is about to logout.
650
651 @param user: User that is about to be logged out
652 @type user: MCUser
653 '''
654 pass
655
657 '''
658 Stops the application server.
659 '''
660 self.stopPingTask()
661 log.msg(tr('sys00002'))
662
664 '''
665 Returns the worker pool manager
666 '''
667 return self._workerPoolManager
668
670 '''
671 Returns the current working directory of the application server.
672
673 @rtype: string
674 @return: Current working directory.
675 '''
676 return self._directory
677
679 '''
680 Returns the configuration of the application server.
681
682 @rtype: ServerConf
683 @return: Conf of the server
684 '''
685 return self._configuration
686
688 '''
689 Sets the configuration of the application server.
690
691 @param conf: Server configuration data instance
692 @type conf: ServerConf
693 '''
694 self._configuration = conf
695
696 - def domainManager(self):
697 '''
698 Returns the domain manager of the application server.
699
700 @return: Domain Manager of the server
701 @rtype: DomainManager
702 '''
703 return self._domainManager
704
706 '''
707 Returns the code manager of the application server.
708
709 @return: Server code manager of the server
710 @rtype: ServerCodeManager
711 '''
712 return self._serverCodeManager
713
715 '''
716 Returns the database manager of the application server.
717
718 @return: Database manager of the server
719 @rtype: DBManager
720 '''
721 return self._dbManager
722
724 '''
725 Returns the master server of the server if the server is slave
726 else return None.
727
728 @return: Master server of the server
729 @rtype: Server
730 '''
731 return self._masterServer
732
734 '''
735 Sets the configuration of the application server.
736
737 @param masterServer: The master server
738 '''
739 self._masterServer = masterServer
740
742 '''
743 Returns the master server services of the master server if the server is slave
744 else return None.
745
746 @return: Master server of the server
747 @rtype: Services
748 '''
749 return self._masterServerServices
750
752 '''
753 Sets the configuration of the application server.
754 '''
755 self._masterServerServices = masterServerServices
756
758 '''
759 Returns the master server perspective of the server if the server is slave
760 else return None.
761
762 @rtype: ServerPerspective
763 @return: Master server perspective of the server
764 '''
765 return self._masterServerPerspective
766
768 '''
769 Sets the configuration of the application server.
770 '''
771 self._masterServerPerspective = masterServerPerspective
772
778
780 '''
781 Returns the list of connted slave servers.
782 '''
783 return self._slaveServers
784
786 '''
787 A Slave server is connected.
788 '''
789 self._slaveServers.append(mind)
790
792 '''
793 A Slave server is connected.
794 '''
795 self._slaveServers.remove(mind)
796
797 @defer.inlineCallbacks
799 '''
800 Calls a function from the master server.
801 '''
802 if self.masterServerServices():
803 result = yield self.masterServerServices().callRemote('test', 11)
804 print 'Master Server Connection Test Result:', result
805
806 @defer.inlineCallbacks
808 '''
809 Calls a function from the slave server.
810 '''
811 for slave in self.slaveServers():
812 result = yield slave.callRemote('test', 10)
813 print 'Slave Server Connection Test Result:', result
814