[MOBY-guts] biomoby commit
Yan Wong
yanwong at pub.open-bio.org
Tue Feb 1 08:52:17 UTC 2005
yanwong
Tue Feb 1 03:52:17 EST 2005
Update of /home/repository/moby/moby-live/Python/bioMoby/webservice
In directory pub.open-bio.org:/tmp/cvs-serv12609/webservice
Modified Files:
Dispatcher.py Invocators.py TCBioMoby.py __init__.py
Log Message:
moby-live/Python/bioMoby/webservice Dispatcher.py,1.2,1.3 Invocators.py,1.2,1.3 TCBioMoby.py,1.2,1.3 __init__.py,1.2,1.3
===================================================================
RCS file: /home/repository/moby/moby-live/Python/bioMoby/webservice/Dispatcher.py,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- /home/repository/moby/moby-live/Python/bioMoby/webservice/Dispatcher.py 2005/01/18 13:46:24 1.2
+++ /home/repository/moby/moby-live/Python/bioMoby/webservice/Dispatcher.py 2005/02/01 08:52:17 1.3
@@ -76,7 +76,7 @@
def __str__(self):
return "An error occurs during the Dispatcher process: "
-
+
class AbstractDispatcher:
""" Subclass this one to create your own dispatcher class
a Dispatcher class must have a execute method
@@ -91,16 +91,16 @@
from bioMoby import MobyUnmarshaller
- self._isbase64=False
- self._workdir=workdir
- self.answers={}
-
- #First see if we have a MobyContent Object or a string
- mc=mobyContentXML
- self._isString=False
-
- if not hasattr(mobyContentXML, 'queryData'):
- self._isString=True
+ self._isbase64=False
+ self._workdir=workdir
+ self.answers={}
+
+ #First see if we have a MobyContent Object or a string
+ mc=mobyContentXML
+ self._isString=False
+
+ if not hasattr(mobyContentXML, 'queryData'):
+ self._isString=True
try:
#Do some cleaning on the XML Document before proceeding...
#Remove all \t|\n from the <a>\n\t</a>
@@ -110,117 +110,117 @@
mc=um.loads("><".join(r.split(mobyContentXML)))
except:
try:
- mc=um.loads(base64.decodestring(mobyContentXML))
- self._isbase64=True
- except binascii.Error:
- raise EDispatchError, "Invalid types in the moby Content object"
+ mc=um.loads(base64.decodestring(mobyContentXML))
+ self._isbase64=True
+ except binascii.Error:
+ raise EDispatchError, "Invalid types in the moby Content object"
self.queryData=mc.queryData
self.invocator=Invocator
- self._invocatorParameters=invocatorParameters
+ self._invocatorParameters=invocatorParameters
self.commandBuilder=CommandBuilder
self.formatter=formatter
-
- del MobyUnmarshaller
+
+ del MobyUnmarshaller
def execute(self):
- """ Override this method
- """
- pass
+ """ Override this method
+ """
+ pass
def _toMoby(self):
- """Return the results as a MobyContent Object
- """
- from bioMoby import MobyContent
-
- if self._isbase64:
- return base64.encodestring(str(MobyContent(self.answers)))
-
+ """Return the results as a MobyContent Object
+ """
+ from bioMoby import MobyContent
+
+ if self._isbase64:
+ return base64.encodestring(str(MobyContent(self.answers)))
+
return str(MobyContent(self.answers))
-
+
class SimpleDispatcher(AbstractDispatcher):
""" A simple dispatcher execute sequentially the treatments
"""
def __init__(self, mobyContentXML, Invocator, InvocatorParameters, CommandBuilder, formatter=None, workdir="/tmp"):
- AbstractDispatcher.__init__(self, mobyContentXML, Invocator, InvocatorParameters, CommandBuilder, formatter, workdir)
-
- from bioMoby import MobyObject
-
- if 'JOBSESSION' in self.queryData.keys():
- self._sessionid=self.queryData['JOBSESSION'][0].id
- else:
- self._sessionid=`random.randint(0,sys.maxint-1)`
-
- while "session"+self._sessionid in os.listdir(self._workdir):
- self._sessionid=`random.randint(0,sys.maxint-1)`
-
+ AbstractDispatcher.__init__(self, mobyContentXML, Invocator, InvocatorParameters, CommandBuilder, formatter, workdir)
+
+ from bioMoby import MobyObject
+
+ if 'JOBSESSION' in self.queryData.keys():
+ self._sessionid=self.queryData['JOBSESSION'][0].id
+ else:
+ self._sessionid=`random.randint(0,sys.maxint-1)`
+
+ while "session"+self._sessionid in os.listdir(self._workdir):
+ self._sessionid=`random.randint(0,sys.maxint-1)`
+
os.mkdir(self._workdir+"/session"+self._sessionid)
-
- os.chdir(self._workdir+"/session"+self._sessionid)
-
+
+ os.chdir(self._workdir+"/session"+self._sessionid)
+
self.answers={'JOBSESSION':[MobyObject(namespace="SESSIONID", id=self._sessionid)]}
def _clean(self):
- """Clean the session directory
- """
+ """Clean the session directory
+ """
#Clean the session directory
os.chdir(self._workdir)
shutil.rmtree(self._workdir+"/session"+self._sessionid)
-
+
def _fetchAnswers(self):
- """ Fetch the answers and clean the session directory
- """
- from bioMoby import GeneralInvocator
-
- answers={}
+ """ Fetch the answers and clean the session directory
+ """
+ from bioMoby import GeneralInvocator
+
+ answers={}
- ruserdir="session"+self._sessionid
+ ruserdir="session"+self._sessionid
try:
- l=os.listdir(self._workdir+"/"+ruserdir)
- except:
- l=[]
-
- if len(l)>0:
- #There is a session directory
+ l=os.listdir(self._workdir+"/"+ruserdir)
+ except:
+ l=[]
+
+ if len(l)>0:
+ #There is a session directory
for querykey in l:
- if querykey[:8]=="results-":
- aQueryName=querykey[8:]
- gi=GeneralInvocator(aQueryName ,userdir=self._workdir+"/"+ruserdir)
- answers[aQueryName]=[gi.getResults()]
+ if querykey[:8]=="results-":
+ aQueryName=querykey[8:]
+ gi=GeneralInvocator(aQueryName ,userdir=self._workdir+"/"+ruserdir)
+ answers[aQueryName]=[gi.getResults()]
if self.formatter != None:
answers[aQueryName]=self.formatter(answers[aQueryName][0])
-
+
if len(answers.keys())>0:
- self.answers=answers
-
- self._clean()
+ self.answers=answers
+
+ self._clean()
- del GeneralInvocator
-
+ del GeneralInvocator
+
def execute(self):
- """Execute the queries sequentially
- """
+ """Execute the queries sequentially
+ """
- try:
+ try:
for key in self.queryData.keys():
- query=self.queryData[key]
- r=self.invocator(key, self.commandBuilder(query), userdir=self._workdir+"/session"+self._sessionid)
- r.execute()
-
- self._fetchAnswers()
- except:
- import traceback
- import StringIO
- from bioMoby import MobyString
- sio=StringIO.StringIO()
- traceback.print_exc(file=sio)
-
- self.answers['JOBSESSION']=[MobyString(content=sio.getvalue())]
- self._clean()
-
- return self._toMoby()
+ query=self.queryData[key]
+ r=self.invocator(key, self.commandBuilder(query), userdir=self._workdir+"/session"+self._sessionid)
+ r.execute()
+
+ self._fetchAnswers()
+ except:
+ import traceback
+ import StringIO
+ from bioMoby import MobyString
+ sio=StringIO.StringIO()
+ traceback.print_exc(file=sio)
+
+ self.answers['JOBSESSION']=[MobyString(content=sio.getvalue())]
+ self._clean()
+
+ return self._toMoby()
class MultiThreadDispatcher(SimpleDispatcher):
""" Use a MultiThread to execute a query
@@ -231,114 +231,114 @@
from bioMoby import MobyString
query=self.queryData[queryKey]
-
- #Execute the query
+
+ #Execute the query
try:
r=self.invocator(queryKey, self.commandBuilder(query), userdir=self._workdir+"/session"+self._sessionid)
- r.execute()
+ r.execute()
time.sleep(0)
-
+
except:
- #If there is a problem, we return the generated exception
- import traceback
- import StringIO
- sio=StringIO.StringIO()
- traceback.print_exc(file=sio)
-
- self.answers['JOBSESSION']=[MobyString(content=sio.getvalue())]
- self._clean()
+ #If there is a problem, we return the generated exception
+ import traceback
+ import StringIO
+ sio=StringIO.StringIO()
+ traceback.print_exc(file=sio)
+
+ self.answers['JOBSESSION']=[MobyString(content=sio.getvalue())]
+ self._clean()
del MobyString
def execute(self):
- """ Execute the queries with threads
- """
-
- try:
+ """ Execute the queries with threads
+ """
+
+ try:
os.chdir(self._workdir+"/session"+self._sessionid)
mt=MultiThread(self._execute, self.queryData.keys())
mt.start()
mt.join()
- self._fetchAnswers()
+ self._fetchAnswers()
except:
- self._clean()
-
- return self._toMoby()
+ self._clean()
+
+ return self._toMoby()
-
+
class PBSDispatcher(SimpleDispatcher):
"""Dispatcher for PBS jobs
"""
def __init__(self, mobyContentXML, InvocatorParameters, CommandBuilder, formatter=None, workdir="/tmp"):
- """ PBS Dispatcher work with PBSInvocator
- it uses a session directory, a session file (where the queries IDs are stored)
- when the execute method is call with an session id, it reads all entries from the session file and
- make a qstat on each ids.
- """
- from bioMoby.webservice import PBSInvocator
-
- SimpleDispatcher.__init__(self, mobyContentXML, PBSInvocator, InvocatorParameters, CommandBuilder, formatter, workdir)
+ """ PBS Dispatcher work with PBSInvocator
+ it uses a session directory, a session file (where the queries IDs are stored)
+ when the execute method is call with an session id, it reads all entries from the session file and
+ make a qstat on each ids.
+ """
+ from bioMoby.webservice import PBSInvocator
+
+ SimpleDispatcher.__init__(self, mobyContentXML, PBSInvocator, InvocatorParameters, CommandBuilder, formatter, workdir)
def _getResults(self):
- """ Do a qstat and check availability of results
- """
- from bioMoby import GeneralInvocator
-
- answers={}
- outputfiles={}
- queryKeys=[]
-
- #Read the sessions file, in which there are all queries identifiers
-
- fp=file("sessions","r")
- for queryInformation in fp.readlines():
- #for each queries, retrieve the qsub ids, and associated output files.
- #delete the CR at the end of the line before proceding
- queryInformation=queryInformation[:-1]
- #Get the IDs
- qsubId=queryInformation.split(";")[1].split(".")[0]
- qk=queryInformation.split(";")[0]
- #Get the query key
- queryKeys.append(qk)
+ """ Do a qstat and check availability of results
+ """
+ from bioMoby import GeneralInvocator
+
+ answers={}
+ outputfiles={}
+ queryKeys=[]
+
+ #Read the sessions file, in which there are all queries identifiers
+
+ fp=file("sessions","r")
+ for queryInformation in fp.readlines():
+ #for each queries, retrieve the qsub ids, and associated output files.
+ #delete the CR at the end of the line before proceding
+ queryInformation=queryInformation[:-1]
+ #Get the IDs
+ qsubId=queryInformation.split(";")[1].split(".")[0]
+ qk=queryInformation.split(";")[0]
+ #Get the query key
+ queryKeys.append(qk)
#get the associated Outputfiles
- outputfiles[qk]=queryInformation.split(";")[2].split("::")
+ outputfiles[qk]=queryInformation.split(";")[2].split("::")
- #Do a qstat and check if the job is still running or on queue
- i, o, e = os.popen3("qstat %s"%qsubId)
+ #Do a qstat and check if the job is still running or on queue
+ i, o, e = os.popen3("qstat %s"%qsubId)
- orl=o.readlines()
- erl=e.readlines()
-
- if len(orl)>0:
- #if one job is not finished, exit the _getResults function
- #The dispatcher will give all the results or none
- return None
-
- fp.close()
-
- # At this step, all jobs are done
- tdir=self._workdir+"/session"+self._sessionid
-
-
- for querykey in queryKeys:
- #check the results-query.out:
- if "results-"+querykey+".out" in os.listdir(tdir):
+ orl=o.readlines()
+ erl=e.readlines()
+
+ if len(orl)>0:
+ #if one job is not finished, exit the _getResults function
+ #The dispatcher will give all the results or none
+ return None
+
+ fp.close()
+
+ # At this step, all jobs are done
+ tdir=self._workdir+"/session"+self._sessionid
+
+
+ for querykey in queryKeys:
+ #check the results-query.out:
+ if "results-"+querykey+".out" in os.listdir(tdir):
fp=file(tdir+"/results-"+querykey+".out")
- s=fp.readlines()
- fp.close()
+ s=fp.readlines()
+ fp.close()
if len(s)==0:
- os.remove(tdir+"/results-"+querykey+".out")
- else:
- if len(outputfiles.keys())>0:
- shutil.copy(tdir+"/results-"+querykey+".out", tdir+"/results-"+querykey+"/results.out")
+ os.remove(tdir+"/results-"+querykey+".out")
+ else:
+ if len(outputfiles.keys())>0:
+ shutil.copy(tdir+"/results-"+querykey+".out", tdir+"/results-"+querykey+"/results.out")
for f in outputfiles[querykey]:
- ruserdir=tdir+"/results-"+querykey
+ ruserdir=tdir+"/results-"+querykey
# if it is a file then retrieve the file
if os.path.isfile(f):
aTemp=f.split('/')
@@ -351,48 +351,48 @@
os.remove(f+"/"+fs)
os.removedirs(f)
- #retrieve all the results
+ #retrieve all the results
try:
- l=os.listdir(tdir)
- except:
- l=[]
-
- if len(l)>0:
+ l=os.listdir(tdir)
+ except:
+ l=[]
+
+ if len(l)>0:
for querykey in l:
- if querykey[:8]=="results-":
- aQueryName=querykey[8:]
- gi=GeneralInvocator(aQueryName, userdir=tdir)
- answers[aQueryName]=[gi.getResults()]
+ if querykey[:8]=="results-":
+ aQueryName=querykey[8:]
+ gi=GeneralInvocator(aQueryName, userdir=tdir)
+ answers[aQueryName]=[gi.getResults()]
if self.formatter != None:
answers[aQueryName]=self.formatter(answers[aQueryName][0])
-
+
if len(answers.keys())>0:
- self.answers=answers
-
+ self.answers=answers
+
self._clean()
-
- del GeneralInvocator
+
+ del GeneralInvocator
-
+
def execute(self):
- """Execute the command in a PBS script using the PBSInvocator
- """
+ """Execute the command in a PBS script using the PBSInvocator
+ """
if 'JOBSESSION' in self.queryData.keys():
- self.answers=self.queryData
+ self.answers=self.queryData
- self._getResults()
- else:
- #we need to store informations in a file
- #query id, qsubid, outputfiles
- fp=file("sessions","w")
- for queryKey in self.queryData.keys():
- query=self.queryData[queryKey]
- r=self.invocator(queryKey, self.commandBuilder(query), fp, userdir=self._workdir+"/session"+self._sessionid)
- r.setParameters(self._invocatorParameters)
- r.execute()
- fp.close()
+ self._getResults()
+ else:
+ #we need to store informations in a file
+ #query id, qsubid, outputfiles
+ fp=file("sessions","w")
+ for queryKey in self.queryData.keys():
+ query=self.queryData[queryKey]
+ r=self.invocator(queryKey, self.commandBuilder(query), fp, userdir=self._workdir+"/session"+self._sessionid)
+ r.setParameters(self._invocatorParameters)
+ r.execute()
+ fp.close()
return self._toMoby()
@@ -405,18 +405,18 @@
The commandBuilder function shall return a command, arguments, and a set of temporary files
[queryData]-->("commandName","commandArgs", ["tempfiles"])
"""
- SimpleDispatcher.__init__(self, mobyContentXML, Invocator, None, CommandBuilder, formatter, workdir)
-
- import pyipc
-
- #Create a Jobsession ID, a semaphore and a session directory
- if 'JOBSESSION' in self.queryData.keys():
- self._sem=pyipc.SemaphoreGroup(int(self._sessionid))
- else:
+ SimpleDispatcher.__init__(self, mobyContentXML, Invocator, None, CommandBuilder, formatter, workdir)
+
+ import pyipc
+
+ #Create a Jobsession ID, a semaphore and a session directory
+ if 'JOBSESSION' in self.queryData.keys():
+ self._sem=pyipc.SemaphoreGroup(int(self._sessionid))
+ else:
self._sem=pyipc.SemaphoreGroup(int(self._sessionid))
self._sem.wait()
-
- del pyipc
+
+ del pyipc
def _execute(self, queryKey):
""" Execute a single query, store the result in the answers attribute
@@ -424,99 +424,99 @@
from bioMoby import MobyString
query=self.queryData[queryKey]
-
- #Execute the query
+
+ #Execute the query
try:
r=self.invocator(queryKey, self.commandBuilder(query), userdir=self._workdir+"/session"+self._sessionid)
- r.execute()
+ r.execute()
time.sleep(0)
except:
- #If there is a problem, we return the generated exception
- import traceback
- fp=file(self._workdir+"/session"+self._sessionid+"/results-"+queryKey,"w")
- fp.write("The query: "+queryKey+" has not been processed")
- traceback.print_exc(file=fp)
- fp.close()
+ #If there is a problem, we return the generated exception
+ import traceback
+ fp=file(self._workdir+"/session"+self._sessionid+"/results-"+queryKey,"w")
+ fp.write("The query: "+queryKey+" has not been processed")
+ traceback.print_exc(file=fp)
+ fp.close()
del MobyString
def _getSem(self):
- try:
- #Wait for answers of all queries
- self._sem.wait()
- self._fetchAnswers()
+ try:
+ #Wait for answers of all queries
+ self._sem.wait()
+ self._fetchAnswers()
- #Remove the IPC
+ #Remove the IPC
import pyipc
- pyipc.removeIPC(self._sem)
- del pyipc
+ pyipc.removeIPC(self._sem)
+ del pyipc
except:
- pass
-
+ pass
+
def execute(self, timeout=120):
"""Execute the queries with a pool of threads, store the results in a mobyContent Object
"""
- import signal
-
- #Do a fetchAnswers if the user has a sessionid
- if 'JOBSESSION' in self.queryData.keys():
- self.answers=self.queryData
-
- t=threading.Thread(target=self._getSem)
- t.start()
- t.join(timeout)
+ import signal
+
+ #Do a fetchAnswers if the user has a sessionid
+ if 'JOBSESSION' in self.queryData.keys():
+ self.answers=self.queryData
+
+ t=threading.Thread(target=self._getSem)
+ t.start()
+ t.join(timeout)
return self._toMoby()
else:
- #A MobyContent without jobsession id has been received
- try:
- #make a fork
- pid=os.fork()
- except:
- raise Exception, "Cannot treat any further request, please retry"
-
- if pid>0:
- #try to get the results
-# os.wait()
- t=threading.Thread(target=self._getSem)
- t.start()
- t.join(timeout)
+ #A MobyContent without jobsession id has been received
+ try:
+ #make a fork
+ pid=os.fork()
+ except:
+ raise Exception, "Cannot treat any further request, please retry"
+
+ if pid>0:
+ #try to get the results
+# os.wait()
+ t=threading.Thread(target=self._getSem)
+ t.start()
+ t.join(timeout)
return self._toMoby()
- else:
+ else:
#do second fork
try:
pid = os.fork()
- except:
- raise Exception, "Cannot treat any further request, please retry"
-
+ except:
+ raise Exception, "Cannot treat any further request, please retry"
+
if pid>0:
- #it it is too long, kill the process
- time.sleep(1.05*timeout)
+ #it it is too long, kill the process
+ time.sleep(1.05*timeout)
os.kill(os.getppid(), signal.SIGKILL)
- os._exit(0)
- else:
- #execute the treatment
+ os._exit(0)
+ else:
+ #execute the treatment
os.setpgrp()
-
- try:
- os.chdir(self._workdir+"/session"+self._sessionid)
- mt=MultiThread(self._execute, self.queryData.keys())
+
+ try:
+ os.chdir(self._workdir+"/session"+self._sessionid)
+ mt=MultiThread(self._execute, self.queryData.keys())
mt.start()
mt.join()
- except:
- self._clean()
+ except:
+ self._clean()
- try:
- self._sem.signal()
+ try:
+ self._sem.signal()
finally:
- if os.getppid()>1:
- try:
- os.kill(os.getppid(), signal.SIGKILL)
- except:
- pass
-
- os._exit(0)
+ if os.getppid()>1:
+ try:
+ os.kill(os.getppid(), signal.SIGKILL)
+ except:
+ pass
+
+ os._exit(0)
- #if nothing works return nothing
- return {}
+ #if nothing works return nothing
+ return {}
===================================================================
RCS file: /home/repository/moby/moby-live/Python/bioMoby/webservice/Invocators.py,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- /home/repository/moby/moby-live/Python/bioMoby/webservice/Invocators.py 2005/01/18 13:46:24 1.2
+++ /home/repository/moby/moby-live/Python/bioMoby/webservice/Invocators.py 2005/02/01 08:52:17 1.3
@@ -18,34 +18,34 @@
"""
def __init__(self, queryid, userdir="/tmp", timeout=None):
- """queryid is the identifier of a query, userdir is the session directory, timeout the limit time of execution of the query
- """
+ """queryid is the identifier of a query, userdir is the session directory, timeout the limit time of execution of the query
+ """
self.command=""
from bioMoby import MobyObject
self.results=[]
self.queryid=queryid
-
+
del MobyObject
self.errors=""
self.userdir=userdir
def getResults(self):
- """retrieve the results if available
- """
+ """retrieve the results if available
+ """
import os
fname=self.userdir+"/results-"+self.queryid
if os.path.isfile(fname):
- #if it is a single file
+ #if it is a single file
fp=file(fname,"r")
self.results=[fp.read()]
fp.close()
# os.remove(fname)
else:
- #or a directory
+ #or a directory
self.results=[]
for fs in os.listdir(fname):
if os.path.isfile(fname+"/"+fs):
@@ -87,32 +87,32 @@
results=[]
- ruserdir=self.userdir+"/results-"+self.queryid
-
+ ruserdir=self.userdir+"/results-"+self.queryid
+
#if there are some output files then retrieve them as results
if len(self.outputfiles) !=0:
i,o,e=os.popen3(self.command)
- #read error first and then standard output
-
+ #read error first and then standard output
+
try:
et=e.read()
- fp=file(self.userdir+"/"+self.queryid+".err")
- fp.write(et)
- fp.close()
+ fp=file(self.userdir+"/"+self.queryid+".err")
+ fp.write(et)
+ fp.close()
except:
- pass
-
- try:
+ pass
+
+ try:
ot=o.read()
- fp=file(self.userdir+"/"+self.queryid+".out")
- fp.write(ot)
- fp.close()
- except:
- pass
+ fp=file(self.userdir+"/"+self.queryid+".out")
+ fp.write(ot)
+ fp.close()
+ except:
+ pass
- #wait unfinished sub process
-
+ #wait unfinished sub process
+
try:
os.wait()
except:
@@ -123,7 +123,7 @@
for f in self.outputfiles:
# if it is a file then retrieve the file
if os.path.isfile(f):
- aTemp=f.split('/')
+ aTemp=f.split('/')
shutil.copy(f, ruserdir+"/"+aTemp[len(aTemp)-1])
os.remove(f)
elif os.path.isdir(f):
@@ -135,20 +135,20 @@
else:
stdin, stdout, stderr=os.popen3(self.command)
results=stdout.read()
-
- fp=file(self.userdir+"/results-"+self.queryid,"w")
- fp.write(results)
- fp.close()
-
-# t=stderr.read()
+
+ fp=file(self.userdir+"/results-"+self.queryid,"w")
+ fp.write(results)
+ fp.close()
+
+# t=stderr.read()
stdin.close()
stdout.close()
stderr.close()
os.wait()
- #Make all the cleaning wipe the session directory
-
+ #Make all the cleaning wipe the session directory
+
for tmpfile in self.tempfiles:
try:
if os.path.isfile(tmpfile):
@@ -163,71 +163,71 @@
"""Parameters for the qsub command
"""
def __init__(self, queueName, qsubParameters):
- """ queueName: name of the PBS queue
- qsubParameters: a list of parameters for the -l option: ['ncpus=4', 'walltime=01:00:00']
- """
- self.queueName=queueName
- self.qsubParameters=qsubParameters
-
-
+ """ queueName: name of the PBS queue
+ qsubParameters: a list of parameters for the -l option: ['ncpus=4', 'walltime=01:00:00']
+ """
+ self.queueName=queueName
+ self.qsubParameters=qsubParameters
+
+
class PBSInvocator(LocalInvocator):
"""Invokes a command in a qsub
"""
def __init__(self, queryKey, command, aFile, userdir="/tmp"):
- """The file is needed here to keep the generated outputfiles
- """
- LocalInvocator.__init__(self, queryKey, command, userdir)
- self._queueName=""
- self._PBSparameters=[]
- self._aFile=aFile
+ """The file is needed here to keep the generated outputfiles
+ """
+ LocalInvocator.__init__(self, queryKey, command, userdir)
+ self._queueName=""
+ self._PBSparameters=[]
+ self._aFile=aFile
def setParameters(self, invocatorParameters):
- """Set parameters of qsub
- """
- self._queueName=invocatorParameters.queueName
- self._PBSparameters=invocatorParameters.qsubParameters
-
+ """Set parameters of qsub
+ """
+ self._queueName=invocatorParameters.queueName
+ self._PBSparameters=invocatorParameters.qsubParameters
+
def execute(self):
- """execute the command in a qsub script
- """
-
- #we write in the session directory a small script
- fp=file(self.userdir+"/"+self.queryid+".sh","w")
- fp.write("#!/bin/bash\n")
- fp.write("#PBS -S /bin/bash\n")
- fp.write("#PBS -o "+self.userdir+"/results-"+self.queryid+".out\n")
- fp.write("#PBS -e "+self.userdir+"/"+self.queryid+".err\n")
- fp.write("#PBS -q %s\n"%self._queueName)
- fp.write("#PBS -l %s\n"%":".join(self._PBSparameters))
-
- #we add to the script the cd command in order to be in the session directory
- fp.write("cd %s\n"%self.userdir)
- fp.write("%s\n"%self.command)
- fp.close()
-
- import os
-
- #execute the qsub command
- i, o, e=os.popen3("qsub "+self.userdir+"/"+self.queryid+".sh")
-
- #see if there are mistakes
- erl=e.read()
-
- if erl!="":
- raise Exception, "".join(erl)
+ """execute the command in a qsub script
+ """
+
+ #we write in the session directory a small script
+ fp=file(self.userdir+"/"+self.queryid+".sh","w")
+ fp.write("#!/bin/bash\n")
+ fp.write("#PBS -S /bin/bash\n")
+ fp.write("#PBS -o "+self.userdir+"/results-"+self.queryid+".out\n")
+ fp.write("#PBS -e "+self.userdir+"/"+self.queryid+".err\n")
+ fp.write("#PBS -q %s\n"%self._queueName)
+ fp.write("#PBS -l %s\n"%":".join(self._PBSparameters))
+
+ #we add to the script the cd command in order to be in the session directory
+ fp.write("cd %s\n"%self.userdir)
+ fp.write("%s\n"%self.command)
+ fp.close()
+
+ import os
+
+ #execute the qsub command
+ i, o, e=os.popen3("qsub "+self.userdir+"/"+self.queryid+".sh")
+
+ #see if there are mistakes
+ erl=e.read()
+
+ if erl!="":
+ raise Exception, "".join(erl)
- #write in the session file the queries, qsub identifiers and the output files
- orl=o.readlines()
+ #write in the session file the queries, qsub identifiers and the output files
+ orl=o.readlines()
anId=orl[0].split('\n')[0]
- if len(self.outputfiles) !=0:
- self._aFile.write(self.queryid+";"+anId+";"+"::".join(self.outputfiles)+"\n")
- os.mkdir(self.userdir+"/results-"+self.queryid)
-
- del os
- #return the qsub identifier
- return anId
+ if len(self.outputfiles) !=0:
+ self._aFile.write(self.queryid+";"+anId+";"+"::".join(self.outputfiles)+"\n")
+ os.mkdir(self.userdir+"/results-"+self.queryid)
+
+ del os
+ #return the qsub identifier
+ return anId
class CGIPostMInvocator(GeneralInvocator):
"""Functions for multipart post
@@ -241,9 +241,9 @@
files is a sequence of (name, filename, value) elements for data to be uploaded as files
Return the server's response page.
"""
-
+
import httplib
-
+
content_type, body = self.encode_multipart_formdata()
print content_type, body
h = httplib.HTTP(host)
@@ -253,9 +253,9 @@
h.endheaders()
h.send(body)
errcode, errmsg, headers = h.getreply()
-
+
del httplib
-
+
return h.file.readlines()
def encode_multipart_formdata(self):
===================================================================
RCS file: /home/repository/moby/moby-live/Python/bioMoby/webservice/TCBioMoby.py,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- /home/repository/moby/moby-live/Python/bioMoby/webservice/TCBioMoby.py 2005/01/18 13:46:24 1.2
+++ /home/repository/moby/moby-live/Python/bioMoby/webservice/TCBioMoby.py 2005/02/01 08:52:17 1.3
@@ -53,4 +53,4 @@
from xml.dom.minidom import parseString
print >>sw, '''<%s><![CDATA[%s]]></%s>''' % (self.tag, parseString(toReturn).toprettyxml(), self.tag)
-
+
===================================================================
RCS file: /home/repository/moby/moby-live/Python/bioMoby/webservice/__init__.py,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- /home/repository/moby/moby-live/Python/bioMoby/webservice/__init__.py 2005/01/18 13:46:24 1.2
+++ /home/repository/moby/moby-live/Python/bioMoby/webservice/__init__.py 2005/02/01 08:52:17 1.3
@@ -35,7 +35,7 @@
l=filter(filterfunc, setOfParameters)
if l:
- return l[0]
+ return l[0]
return
@@ -56,6 +56,6 @@
l=filter(filterfunc, setOfParameters)
if l:
- return l[0]
-
+ return l[0]
+
return
\ No newline at end of file
More information about the MOBY-guts
mailing list