[MOBY-guts] biomoby commit

Yan Wong yanwong at pub.open-bio.org
Fri Feb 4 12:58:25 UTC 2005


yanwong
Fri Feb  4 07:58:24 EST 2005
Update of /home/repository/moby/moby-live/Python/bioMoby/webservice
In directory pub.open-bio.org:/tmp/cvs-serv29563/webservice

Modified Files:
	Dispatcher.py 
Log Message:


moby-live/Python/bioMoby/webservice Dispatcher.py,1.3,1.4
===================================================================
RCS file: /home/repository/moby/moby-live/Python/bioMoby/webservice/Dispatcher.py,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- /home/repository/moby/moby-live/Python/bioMoby/webservice/Dispatcher.py	2005/02/01 08:52:17	1.3
+++ /home/repository/moby/moby-live/Python/bioMoby/webservice/Dispatcher.py	2005/02/04 12:58:24	1.4
@@ -88,9 +88,9 @@
             formatter: a function that transforms raw results from the Invocator into Moby Objects
             workdir: the session directory
         """
-
+        
         from bioMoby import MobyUnmarshaller
-
+        
         self._isbase64=False
         self._workdir=workdir
         self.answers={}
@@ -105,7 +105,7 @@
                 #Do some cleaning on the XML Document before proceeding...
                 #Remove all \t|\n from the <a>\n\t</a>
                 um=MobyUnmarshaller()
-
+                
                 r=re.compile("\>[\n|\t]+\<")
                 mc=um.loads("><".join(r.split(mobyContentXML)))
             except:
@@ -114,7 +114,7 @@
                     self._isbase64=True
                 except binascii.Error:
                     raise  EDispatchError, "Invalid types in the moby Content object"
-
+        
         self.queryData=mc.queryData
         self.invocator=Invocator
         self._invocatorParameters=invocatorParameters
@@ -137,7 +137,7 @@
             return base64.encodestring(str(MobyContent(self.answers)))
         
         return str(MobyContent(self.answers))
-
+        
         
 class SimpleDispatcher(AbstractDispatcher):
     """ A simple dispatcher execute sequentially the treatments
@@ -167,14 +167,14 @@
         #Clean the session directory
         os.chdir(self._workdir)
         shutil.rmtree(self._workdir+"/session"+self._sessionid)
-            
+    
     def _fetchAnswers(self):
         """ Fetch the answers and clean the session directory
         """
         from bioMoby import GeneralInvocator
         
         answers={}
-
+        
         ruserdir="session"+self._sessionid
         try:
             l=os.listdir(self._workdir+"/"+ruserdir)
@@ -188,7 +188,7 @@
                     aQueryName=querykey[8:]
                     gi=GeneralInvocator(aQueryName ,userdir=self._workdir+"/"+ruserdir)
                     answers[aQueryName]=[gi.getResults()]
-
+                    
                     if self.formatter != None:
                         answers[aQueryName]=self.formatter(answers[aQueryName][0])
                         
@@ -196,13 +196,13 @@
                 self.answers=answers
                 
                 self._clean()        
-
+        
         del GeneralInvocator
-                
+    
     def execute(self):
         """Execute the queries sequentially
         """
-
+        
         try:
             for key in self.queryData.keys():
                 query=self.queryData[key]
@@ -229,7 +229,7 @@
         """ Execute a single query, store the result in the answers attribute
         """
         from bioMoby import MobyString
-
+        
         query=self.queryData[queryKey]
         
         #Execute the query
@@ -247,8 +247,7 @@
             
             self.answers['JOBSESSION']=[MobyString(content=sio.getvalue())]
             self._clean()
-
-
+        
         del MobyString
 
     
@@ -261,14 +260,13 @@
             mt=MultiThread(self._execute, self.queryData.keys())
             mt.start()
             mt.join()
-
+            
             self._fetchAnswers()
         except:
             self._clean()
-            
+        
         return self._toMoby()
 
-        
 class PBSDispatcher(SimpleDispatcher):
     """Dispatcher for PBS jobs
     """
@@ -306,10 +304,10 @@
             queryKeys.append(qk)
             #get the associated Outputfiles
             outputfiles[qk]=queryInformation.split(";")[2].split("::")
-
+            
             #Do a qstat and check if the job is still running or on queue
             i, o, e = os.popen3("qstat %s"%qsubId)
-
+            
             orl=o.readlines()
             erl=e.readlines()
             
@@ -330,13 +328,13 @@
                 fp=file(tdir+"/results-"+querykey+".out")
                 s=fp.readlines()
                 fp.close()
-
+                
                 if len(s)==0:
                     os.remove(tdir+"/results-"+querykey+".out")
                 else:
                     if len(outputfiles.keys())>0:
                         shutil.copy(tdir+"/results-"+querykey+".out", tdir+"/results-"+querykey+"/results.out")
-
+            
             for f in outputfiles[querykey]:
                 ruserdir=tdir+"/results-"+querykey
                 # if it is a file then retrieve the file
@@ -350,7 +348,7 @@
                         shutil.copy(f+"/"+fs, ruserdir+"/"+fs)
                         os.remove(f+"/"+fs)
                     os.removedirs(f)
-
+        
         #retrieve all the results
         try:
             l=os.listdir(tdir)
@@ -363,7 +361,7 @@
                     aQueryName=querykey[8:]
                     gi=GeneralInvocator(aQueryName, userdir=tdir)
                     answers[aQueryName]=[gi.getResults()]
-
+                    
                     if self.formatter != None:
                         answers[aQueryName]=self.formatter(answers[aQueryName][0])
                         
@@ -374,14 +372,13 @@
         
         del GeneralInvocator
 
-        
     def execute(self):
         """Execute the command in a PBS script using the PBSInvocator
         """
-
+        
         if 'JOBSESSION' in self.queryData.keys():
             self.answers=self.queryData
-
+            
             self._getResults()
         else:
             #we need to store informations in a file
@@ -393,7 +390,7 @@
                 r.setParameters(self._invocatorParameters)
                 r.execute()
             fp.close()
-
+        
         return self._toMoby()
 
 class Dispatcher(SimpleDispatcher):
@@ -422,7 +419,7 @@
         """ Execute a single query, store the result in the answers attribute
         """
         from bioMoby import MobyString
-
+        
         query=self.queryData[queryKey]
         
         #Execute the query
@@ -437,7 +434,7 @@
             fp.write("The query: "+queryKey+" has not been processed")
             traceback.print_exc(file=fp)
             fp.close()
-
+        
         del MobyString
 
     def _getSem(self):
@@ -445,14 +442,14 @@
             #Wait for answers of all queries
             self._sem.wait()
             self._fetchAnswers()
-
+            
             #Remove the IPC
             import pyipc
             pyipc.removeIPC(self._sem)
             del pyipc
         except:
             pass
-        
+    
     def execute(self, timeout=120):
         """Execute the queries with a pool of threads, store the results in a mobyContent Object
         """
@@ -461,11 +458,11 @@
         #Do a fetchAnswers if the user has a sessionid
         if 'JOBSESSION' in self.queryData.keys():
             self.answers=self.queryData
-
+            
             t=threading.Thread(target=self._getSem)
             t.start()
             t.join(timeout)
-
+            
             return self._toMoby()
         else:
             #A MobyContent without jobsession id has been received
@@ -474,14 +471,14 @@
                 pid=os.fork()
             except:
                 raise Exception, "Cannot treat any further request, please retry"
-
+            
             if pid>0:
                 #try to get the results
 #                os.wait()
                 t=threading.Thread(target=self._getSem)
                 t.start()
                 t.join(timeout)
-
+                
                 return self._toMoby()
             else:
                 #do second fork
@@ -506,7 +503,7 @@
                         mt.join()
                     except:
                         self._clean()
-
+                    
                     try:
                         self._sem.signal()
                     finally:




More information about the MOBY-guts mailing list