[MOBY-guts] biomoby commit
Yan Wong
yanwong at pub.open-bio.org
Fri Feb 4 12:58:25 UTC 2005
yanwong
Fri Feb 4 07:58:24 EST 2005
Update of /home/repository/moby/moby-live/Python/bioMoby/webservice
In directory pub.open-bio.org:/tmp/cvs-serv29563/webservice
Modified Files:
Dispatcher.py
Log Message:
moby-live/Python/bioMoby/webservice Dispatcher.py,1.3,1.4
===================================================================
RCS file: /home/repository/moby/moby-live/Python/bioMoby/webservice/Dispatcher.py,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- /home/repository/moby/moby-live/Python/bioMoby/webservice/Dispatcher.py 2005/02/01 08:52:17 1.3
+++ /home/repository/moby/moby-live/Python/bioMoby/webservice/Dispatcher.py 2005/02/04 12:58:24 1.4
@@ -88,9 +88,9 @@
formatter: a function that transforms raw results from the Invocator into Moby Objects
workdir: the session directory
"""
-
+
from bioMoby import MobyUnmarshaller
-
+
self._isbase64=False
self._workdir=workdir
self.answers={}
@@ -105,7 +105,7 @@
#Do some cleaning on the XML Document before proceeding...
#Remove all \t|\n from the <a>\n\t</a>
um=MobyUnmarshaller()
-
+
r=re.compile("\>[\n|\t]+\<")
mc=um.loads("><".join(r.split(mobyContentXML)))
except:
@@ -114,7 +114,7 @@
self._isbase64=True
except binascii.Error:
raise EDispatchError, "Invalid types in the moby Content object"
-
+
self.queryData=mc.queryData
self.invocator=Invocator
self._invocatorParameters=invocatorParameters
@@ -137,7 +137,7 @@
return base64.encodestring(str(MobyContent(self.answers)))
return str(MobyContent(self.answers))
-
+
class SimpleDispatcher(AbstractDispatcher):
""" A simple dispatcher execute sequentially the treatments
@@ -167,14 +167,14 @@
#Clean the session directory
os.chdir(self._workdir)
shutil.rmtree(self._workdir+"/session"+self._sessionid)
-
+
def _fetchAnswers(self):
""" Fetch the answers and clean the session directory
"""
from bioMoby import GeneralInvocator
answers={}
-
+
ruserdir="session"+self._sessionid
try:
l=os.listdir(self._workdir+"/"+ruserdir)
@@ -188,7 +188,7 @@
aQueryName=querykey[8:]
gi=GeneralInvocator(aQueryName ,userdir=self._workdir+"/"+ruserdir)
answers[aQueryName]=[gi.getResults()]
-
+
if self.formatter != None:
answers[aQueryName]=self.formatter(answers[aQueryName][0])
@@ -196,13 +196,13 @@
self.answers=answers
self._clean()
-
+
del GeneralInvocator
-
+
def execute(self):
"""Execute the queries sequentially
"""
-
+
try:
for key in self.queryData.keys():
query=self.queryData[key]
@@ -229,7 +229,7 @@
""" Execute a single query, store the result in the answers attribute
"""
from bioMoby import MobyString
-
+
query=self.queryData[queryKey]
#Execute the query
@@ -247,8 +247,7 @@
self.answers['JOBSESSION']=[MobyString(content=sio.getvalue())]
self._clean()
-
-
+
del MobyString
@@ -261,14 +260,13 @@
mt=MultiThread(self._execute, self.queryData.keys())
mt.start()
mt.join()
-
+
self._fetchAnswers()
except:
self._clean()
-
+
return self._toMoby()
-
class PBSDispatcher(SimpleDispatcher):
"""Dispatcher for PBS jobs
"""
@@ -306,10 +304,10 @@
queryKeys.append(qk)
#get the associated Outputfiles
outputfiles[qk]=queryInformation.split(";")[2].split("::")
-
+
#Do a qstat and check if the job is still running or on queue
i, o, e = os.popen3("qstat %s"%qsubId)
-
+
orl=o.readlines()
erl=e.readlines()
@@ -330,13 +328,13 @@
fp=file(tdir+"/results-"+querykey+".out")
s=fp.readlines()
fp.close()
-
+
if len(s)==0:
os.remove(tdir+"/results-"+querykey+".out")
else:
if len(outputfiles.keys())>0:
shutil.copy(tdir+"/results-"+querykey+".out", tdir+"/results-"+querykey+"/results.out")
-
+
for f in outputfiles[querykey]:
ruserdir=tdir+"/results-"+querykey
# if it is a file then retrieve the file
@@ -350,7 +348,7 @@
shutil.copy(f+"/"+fs, ruserdir+"/"+fs)
os.remove(f+"/"+fs)
os.removedirs(f)
-
+
#retrieve all the results
try:
l=os.listdir(tdir)
@@ -363,7 +361,7 @@
aQueryName=querykey[8:]
gi=GeneralInvocator(aQueryName, userdir=tdir)
answers[aQueryName]=[gi.getResults()]
-
+
if self.formatter != None:
answers[aQueryName]=self.formatter(answers[aQueryName][0])
@@ -374,14 +372,13 @@
del GeneralInvocator
-
def execute(self):
"""Execute the command in a PBS script using the PBSInvocator
"""
-
+
if 'JOBSESSION' in self.queryData.keys():
self.answers=self.queryData
-
+
self._getResults()
else:
#we need to store informations in a file
@@ -393,7 +390,7 @@
r.setParameters(self._invocatorParameters)
r.execute()
fp.close()
-
+
return self._toMoby()
class Dispatcher(SimpleDispatcher):
@@ -422,7 +419,7 @@
""" Execute a single query, store the result in the answers attribute
"""
from bioMoby import MobyString
-
+
query=self.queryData[queryKey]
#Execute the query
@@ -437,7 +434,7 @@
fp.write("The query: "+queryKey+" has not been processed")
traceback.print_exc(file=fp)
fp.close()
-
+
del MobyString
def _getSem(self):
@@ -445,14 +442,14 @@
#Wait for answers of all queries
self._sem.wait()
self._fetchAnswers()
-
+
#Remove the IPC
import pyipc
pyipc.removeIPC(self._sem)
del pyipc
except:
pass
-
+
def execute(self, timeout=120):
"""Execute the queries with a pool of threads, store the results in a mobyContent Object
"""
@@ -461,11 +458,11 @@
#Do a fetchAnswers if the user has a sessionid
if 'JOBSESSION' in self.queryData.keys():
self.answers=self.queryData
-
+
t=threading.Thread(target=self._getSem)
t.start()
t.join(timeout)
-
+
return self._toMoby()
else:
#A MobyContent without jobsession id has been received
@@ -474,14 +471,14 @@
pid=os.fork()
except:
raise Exception, "Cannot treat any further request, please retry"
-
+
if pid>0:
#try to get the results
# os.wait()
t=threading.Thread(target=self._getSem)
t.start()
t.join(timeout)
-
+
return self._toMoby()
else:
#do second fork
@@ -506,7 +503,7 @@
mt.join()
except:
self._clean()
-
+
try:
self._sem.signal()
finally:
More information about the MOBY-guts
mailing list