[MOBY-guts] biomoby commit
Yan Wong
yanwong at pub.open-bio.org
Tue Jan 18 13:46:24 UTC 2005
yanwong
Tue Jan 18 08:46:24 EST 2005
Update of /home/repository/moby/moby-live/Python/bioMoby/webservice
In directory pub.open-bio.org:/tmp/cvs-serv20853/webservice
Modified Files:
Dispatcher.py Invocators.py TCBioMoby.py __init__.py
Log Message:
moby-live/Python/bioMoby/webservice Dispatcher.py,1.1,1.2 Invocators.py,1.1,1.2 TCBioMoby.py,1.1,1.2 __init__.py,1.1,1.2
===================================================================
RCS file: /home/repository/moby/moby-live/Python/bioMoby/webservice/Dispatcher.py,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- /home/repository/moby/moby-live/Python/bioMoby/webservice/Dispatcher.py 2004/12/08 14:46:40 1.1
+++ /home/repository/moby/moby-live/Python/bioMoby/webservice/Dispatcher.py 2005/01/18 13:46:24 1.2
@@ -1,13 +1,25 @@
-#Class Dispatcher
-#Author Wong Yan
-#Class SingleThread, Multithread, LockedIterator
-#Author: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/203871
-#Date 06/18/2004
-#12/6/2004: changed the constructor code of the AbstractDispatcher (Body tag)
+""" Classes that dispatch the queries stored in a MobyContent XML object into several invocators.
+"""
+"""Author Wong Yan
+ class SingleThread, Multithread, LockedIterator
+ Author: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/203871
+
+ Date 06/18/2004
+
+ 12/6/2004: changed the constructor code of the AbstractDispatcher (Body tag)
+ added a new class: SimpleDispatcher (no multithreading, no use of IPC)
+"""
import threading
import time
import types
+import os
+import sys
+import random
+import re
+import base64
+import shutil
+import binascii
# Pool of threads: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/203871
class SingleThread( threading.Thread ):
@@ -76,11 +88,12 @@
formatter: a function that transforms raw results from the Invocator into Moby Objects
workdir: the session directory
"""
- import base64, binascii, re
+
from bioMoby import MobyUnmarshaller
self._isbase64=False
self._workdir=workdir
+ self.answers={}
#First see if we have a MobyContent Object or a string
mc=mobyContentXML
@@ -108,22 +121,31 @@
self.commandBuilder=CommandBuilder
self.formatter=formatter
- del MobyUnmarshaller, re
+ del MobyUnmarshaller
def execute(self):
""" Override this method
"""
pass
+ def _toMoby(self):
+ """Return the results as a MobyContent Object
+ """
+ from bioMoby import MobyContent
+
+ if self._isbase64:
+ return base64.encodestring(str(MobyContent(self.answers)))
+
+ return str(MobyContent(self.answers))
+
class SimpleDispatcher(AbstractDispatcher):
- """ A simple dispatcher, without execute method
+ """ A simple dispatcher execute sequentially the treatments
"""
def __init__(self, mobyContentXML, Invocator, InvocatorParameters, CommandBuilder, formatter=None, workdir="/tmp"):
AbstractDispatcher.__init__(self, mobyContentXML, Invocator, InvocatorParameters, CommandBuilder, formatter, workdir)
from bioMoby import MobyObject
- import random, os, sys
if 'JOBSESSION' in self.queryData.keys():
self._sessionid=self.queryData['JOBSESSION'][0].id
@@ -136,9 +158,117 @@
os.mkdir(self._workdir+"/session"+self._sessionid)
os.chdir(self._workdir+"/session"+self._sessionid)
+
self.answers={'JOBSESSION':[MobyObject(namespace="SESSIONID", id=self._sessionid)]}
+ def _clean(self):
+ """Clean the session directory
+ """
+ #Clean the session directory
+ os.chdir(self._workdir)
+ shutil.rmtree(self._workdir+"/session"+self._sessionid)
+
+ def _fetchAnswers(self):
+ """ Fetch the answers and clean the session directory
+ """
+ from bioMoby import GeneralInvocator
+
+ answers={}
+
+ ruserdir="session"+self._sessionid
+ try:
+ l=os.listdir(self._workdir+"/"+ruserdir)
+ except:
+ l=[]
+
+ if len(l)>0:
+ #There is a session directory
+ for querykey in l:
+ if querykey[:8]=="results-":
+ aQueryName=querykey[8:]
+ gi=GeneralInvocator(aQueryName ,userdir=self._workdir+"/"+ruserdir)
+ answers[aQueryName]=[gi.getResults()]
+
+ if self.formatter != None:
+ answers[aQueryName]=self.formatter(answers[aQueryName][0])
+ if len(answers.keys())>0:
+ self.answers=answers
+
+ self._clean()
+
+ del GeneralInvocator
+
+ def execute(self):
+ """Execute the queries sequentially
+ """
+
+ try:
+ for key in self.queryData.keys():
+ query=self.queryData[key]
+ r=self.invocator(key, self.commandBuilder(query), userdir=self._workdir+"/session"+self._sessionid)
+ r.execute()
+
+ self._fetchAnswers()
+ except:
+ import traceback
+ import StringIO
+ from bioMoby import MobyString
+ sio=StringIO.StringIO()
+ traceback.print_exc(file=sio)
+
+ self.answers['JOBSESSION']=[MobyString(content=sio.getvalue())]
+ self._clean()
+
+ return self._toMoby()
+
+class MultiThreadDispatcher(SimpleDispatcher):
+ """ Use a MultiThread to execute a query
+ """
+ def _execute(self, queryKey):
+ """ Execute a single query, store the result in the answers attribute
+ """
+ from bioMoby import MobyString
+
+ query=self.queryData[queryKey]
+
+ #Execute the query
+ try:
+ r=self.invocator(queryKey, self.commandBuilder(query), userdir=self._workdir+"/session"+self._sessionid)
+ r.execute()
+ time.sleep(0)
+
+ except:
+ #If there is a problem, we return the generated exception
+ import traceback
+ import StringIO
+ sio=StringIO.StringIO()
+ traceback.print_exc(file=sio)
+
+ self.answers['JOBSESSION']=[MobyString(content=sio.getvalue())]
+ self._clean()
+
+
+ del MobyString
+
+
+ def execute(self):
+ """ Execute the queries with threads
+ """
+
+ try:
+ os.chdir(self._workdir+"/session"+self._sessionid)
+ mt=MultiThread(self._execute, self.queryData.keys())
+ mt.start()
+ mt.join()
+
+ self._fetchAnswers()
+ except:
+ self._clean()
+
+ return self._toMoby()
+
+
class PBSDispatcher(SimpleDispatcher):
"""Dispatcher for PBS jobs
"""
@@ -156,7 +286,6 @@
def _getResults(self):
""" Do a qstat and check availability of results
"""
- import os, shutil
from bioMoby import GeneralInvocator
answers={}
@@ -241,21 +370,14 @@
if len(answers.keys())>0:
self.answers=answers
- try:
- #Clean the session directory
- shutil.rmtree(tdir)
- os.removedirs(tdir)
- except:
- pass
+ self._clean()
- del GeneralInvocator,os, shutil
+ del GeneralInvocator
def execute(self):
"""Execute the command in a PBS script using the PBSInvocator
"""
- from bioMoby import MobyContent
- import base64
if 'JOBSESSION' in self.queryData.keys():
self.answers=self.queryData
@@ -272,21 +394,9 @@
r.execute()
fp.close()
- if self._isbase64:
- return base64.encodestring(str(MobyContent(self.answers)))
-
- if self._isString:
- return str(MobyContent(self.answers))
-
- results=MobyContent(self.answers)
-
- import TCBioMoby
-
- results.typecode=TCBioMoby.Body()
-
- return results
+ return self._toMoby()
-class Dispatcher(AbstractDispatcher):
+class Dispatcher(SimpleDispatcher):
"""All Queries in the MobyContent Object are splitted and executed
"""
def __init__(self, mobyContentXML, Invocator, CommandBuilder, formatter=None, workdir="/tmp"):
@@ -295,46 +405,30 @@
The commandBuilder function shall return a command, arguments, and a set of temporary files
[queryData]-->("commandName","commandArgs", ["tempfiles"])
"""
- AbstractDispatcher.__init__(self, mobyContentXML, Invocator, None, CommandBuilder, formatter, workdir="/tmp")
+ SimpleDispatcher.__init__(self, mobyContentXML, Invocator, None, CommandBuilder, formatter, workdir)
- import os, random, sys, pyipc
- from bioMoby import MobyObject
+ import pyipc
#Create a Jobsession ID, a semaphore and a session directory
if 'JOBSESSION' in self.queryData.keys():
- self._sessionid=self.queryData['JOBSESSION'][0].id
self._sem=pyipc.SemaphoreGroup(int(self._sessionid))
else:
- self._sessionid=`random.randint(0,sys.maxint-1)`
-
- while "session"+self._sessionid in os.listdir(self._workdir):
- self._sessionid=`random.randint(0,sys.maxint-1)`
-
- os.mkdir(self._workdir+"/session"+self._sessionid)
self._sem=pyipc.SemaphoreGroup(int(self._sessionid))
self._sem.wait()
- #By default, we return the Session ID
- self.answers={'JOBSESSION':[MobyObject(namespace="SESSIONID", id=self._sessionid)]}
-
- #Create a lock for synchronisation for the Multithreaded execution
- self._lock=threading.RLock()
-
- del os, random, sys, pyipc
+ del pyipc
def _execute(self, queryKey):
""" Execute a single query, store the result in the answers attribute
"""
- from bioMoby import MobyObject, MobyString
+ from bioMoby import MobyString
query=self.queryData[queryKey]
- result=[MobyString("The "+queryKey+" has not been processed")]
-
#Execute the query
try:
r=self.invocator(queryKey, self.commandBuilder(query), userdir=self._workdir+"/session"+self._sessionid)
- result=[r.execute()]
+ r.execute()
time.sleep(0)
except:
#If there is a problem, we return the generated exception
@@ -344,67 +438,25 @@
traceback.print_exc(file=fp)
fp.close()
- #Avoir concurrency writing on self.answers
- self._lock.acquire()
- self.answers[queryKey]=result
- self._lock.release()
-
- del MobyObject, MobyString
+ del MobyString
- def _fetchAnswers(self):
- """ Fetch answers if they exists
- """
- from bioMoby import GeneralInvocator
- import os, shutil, pyipc
-
- answers={}
-
- ruserdir="session"+self._sessionid
- try:
- l=os.listdir(self._workdir+"/"+ruserdir)
- except:
- l=[]
-
- if len(l)>0:
- #There is a session directory
- for querykey in l:
- if querykey[:8]=="results-":
- aQueryName=querykey[8:]
- gi=GeneralInvocator(aQueryName ,userdir=self._workdir+"/"+ruserdir)
- answers[aQueryName]=[gi.getResults()]
-
- if self.formatter != None:
- answers[aQueryName]=self.formatter(answers[aQueryName][0])
-
- if len(answers.keys())>0:
- self.answers=answers
-
- try:
- #Clean the session directory
- shutil.rmtree(self._workdir+"/"+ruserdir)
- os.removedirs(self._workdir+"/"+ruserdir)
- except:
- pass
-
- #Remove the IPC
- pyipc.removeIPC(self._sem)
-
- del GeneralInvocator,os, shutil
-
-
def _getSem(self):
try:
#Wait for answers of all queries
self._sem.wait()
self._fetchAnswers()
+
+ #Remove the IPC
+ import pyipc
+ pyipc.removeIPC(self._sem)
+ del pyipc
except:
pass
def execute(self, timeout=120):
"""Execute the queries with a pool of threads, store the results in a mobyContent Object
"""
- import os, pyipc, signal
- from bioMoby import MobyContent
+ import signal
#Do a fetchAnswers if the user has a sessionid
if 'JOBSESSION' in self.queryData.keys():
@@ -414,20 +466,7 @@
t.start()
t.join(timeout)
- if self._isbase64:
- import base64
- return base64.encodestring(str(MobyContent(self.answers)))
-
- if self._isString:
- return str(MobyContent(self.answers))
-
- results=MobyContent(self.answers)
-
- import TCBioMoby
-
- results.typecode=TCBioMoby.Body()
-
- return results
+ return self._toMoby()
else:
#A MobyContent without jobsession id has been received
try:
@@ -443,21 +482,7 @@
t.start()
t.join(timeout)
- if self._isbase64:
- import base64
- return base64.encodestring(str(MobyContent(self.answers)))
-
- if self._isString:
- return str(MobyContent(self.answers))
-
-
- results=MobyContent(self.answers)
-
- import TCBioMoby
-
- results.typecode=TCBioMoby.Body()
-
- return results
+ return self._toMoby()
else:
#do second fork
try:
@@ -473,10 +498,14 @@
else:
#execute the treatment
os.setpgrp()
- os.chdir(self._workdir+"/session"+self._sessionid)
- mt=MultiThread(self._execute, self.queryData.keys())
- mt.start()
- mt.join()
+
+ try:
+ os.chdir(self._workdir+"/session"+self._sessionid)
+ mt=MultiThread(self._execute, self.queryData.keys())
+ mt.start()
+ mt.join()
+ except:
+ self._clean()
try:
self._sem.signal()
@@ -488,3 +517,6 @@
pass
os._exit(0)
+
+ #if nothing works return nothing
+ return {}
===================================================================
RCS file: /home/repository/moby/moby-live/Python/bioMoby/webservice/Invocators.py,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- /home/repository/moby/moby-live/Python/bioMoby/webservice/Invocators.py 2004/12/08 14:46:40 1.1
+++ /home/repository/moby/moby-live/Python/bioMoby/webservice/Invocators.py 2005/01/18 13:46:24 1.2
@@ -1,6 +1,11 @@
-#Classes that invoke local commands or CGI scripts
-#Author: Wong Yan
-#Date: 06/18/2004
+""" Classes that invoke local commands or CGI scripts
+"""
+
+"""
+ Author: Wong Yan
+ Date: 06/18/2004
+
+"""
class InvocatorError(Exception):
"""Error during an invocation of method
===================================================================
RCS file: /home/repository/moby/moby-live/Python/bioMoby/webservice/TCBioMoby.py,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- /home/repository/moby/moby-live/Python/bioMoby/webservice/TCBioMoby.py 2004/12/08 14:46:40 1.1
+++ /home/repository/moby/moby-live/Python/bioMoby/webservice/TCBioMoby.py 2005/01/18 13:46:24 1.2
@@ -1,7 +1,15 @@
-#Type for bioMoby webservice and ZSI
-#Need ZSI package
-#12/06/2004 Yan Wong
+"""Type for bioMoby webservice and ZSI
+"""
+"""
+ Need ZSI package
+
+ Author:Yan Wong
+
+ Date: 12/06/2004
+
+ 01/17/2005: solve the problem with GBrowse :) return a raw string instead of the text embbeded in a Body tag.
+"""
from ZSI import _copyright, _children, \
EvaluateException
@@ -14,26 +22,35 @@
class _TCBody:
def parse(self, elt, ps):
#self.checkname(elt, ps)
-
if _children(elt):
- from bioMoby import MobyUnmarshaller
- um=MobyUnmarshaller()
+ from bioMoby import MobyContent
+ mc=MobyContent()
- return um.loads(elt.firstChild.nodeValue)
-
- return None
+ mc.fromMoby(elt.firstChild.nodeValue)
+
+ return mc
-class Body(TypeCode):
- '''Body Type for the .
+class body(TypeCode):
+ '''Body Type for gbrowse_moby
'''
-
- parselist = [ (None,'Body') ]
+
+ parselist = [ (None,'body') ]
seriallist = [ MobyContent ]
- tag="Body"
+ tag="body"
typecode=_TCBody()
def serialize(self, sw, pyobj, name=None, attrtext='', **kw):
+ if not hasattr(pyobj, "toMoby"):
+ from bioMoby import MobyMarshaller
+ m=MobyMarshaller()
+ toReturn=m.dumps(pyobj)
+ else:
+ toReturn=pyobj.toMoby()
+
n = name or self.oname or ('E%x' % id(pyobj))
- print >>sw, '''<%s><![CDATA[%s]]></%s>''' % (self.tag, str(pyobj), self.tag)
+ from xml.dom.minidom import parseString
+
+ print >>sw, '''<%s><![CDATA[%s]]></%s>''' % (self.tag, parseString(toReturn).toprettyxml(), self.tag)
+
===================================================================
RCS file: /home/repository/moby/moby-live/Python/bioMoby/webservice/__init__.py,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- /home/repository/moby/moby-live/Python/bioMoby/webservice/__init__.py 2004/12/08 14:46:40 1.1
+++ /home/repository/moby/moby-live/Python/bioMoby/webservice/__init__.py 2005/01/18 13:46:24 1.2
@@ -1,8 +1,61 @@
-#Author: Wong Yan
-#Date 06/18/2004
-#Web services factory
-#Some classes aimed to ease the building of Moby WebServices
-#
+"""Some classes aimed to ease the building of Moby WebServices
+"""
+
+"""Author: Wong Yan
+ Date 06/18/2004
+ Web services factory
+
+
+ 12/13/2004: added function to manipulate query's data
+
+"""
from Dispatcher import *
from Invocators import *
+
+#Added some functions to manipulate elements in queryData:
+def getParameters(setOfParameters):
+ """ get all parameters from the set of parameters
+ """
+ result=[]
+
+ filterfunc=lambda x: hasattr(x, "__isSecondary__")
+
+ for elem in filter(filterfunc, setOfParameters):
+ result.append(elem)
+
+ return result
+
+def getParameter(setOfParameters, parameterName):
+ """ get a Parameter from a set
+ """
+
+ filterfunc=lambda x: hasattr(x, "__isSecondary__") and x.articleName==parameterName
+
+ l=filter(filterfunc, setOfParameters)
+
+ if l:
+ return l[0]
+
+ return
+
+def getObjects(setOfParameters):
+ """ get all objects from a set of parameters
+ """
+
+ filterfunc=lambda x: not hasattr(x, '__isSecondary__')
+
+ return filter(filterfunc, setOfParameters)
+
+def getObject(setOfParameters, objectName):
+ """ get an object from his name
+ """
+
+ filterfunc=lambda x: not hasattr(x,'__isSecondary__') and hasattr(x,"articleName") and x.articleName==objectName
+
+ l=filter(filterfunc, setOfParameters)
+
+ if l:
+ return l[0]
+
+ return
\ No newline at end of file
More information about the MOBY-guts
mailing list