Changeset 3046

Show
Ignore:
Timestamp:
02/12/08 23:19:32 (10 months ago)
Author:
bgranger
Message:

Initial version of twisted based task client. Seems to work well.

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • ipython1/branches/ipython1-client-r3021/TODO

    r3012 r3046  
    55Actions: 
    66 
    7 * Move all the IPython1 documentation to REST text files. 
    8 * Refactor the exceptions in xmlrpcutil? 
     7* Decide if it should be barrier(*args) or barrier(args) 
     8* Write test_taskxmlrpc 
     9* Write tests for Properties in multiengine 
     10* Work on Task interfaces 
     11* Change coding conventions of interfaces 
     12* Update ChangeLog describing new APIs 
     13* Update documentation 
    914 
  • ipython1/branches/ipython1-client-r3021/docs/examples/task_profiler.py

    r3043 r3046  
    4747    assert opts.tmax >= opts.tmin, "tmax must not be smaller than tmin" 
    4848     
    49     rc = client.RemoteController((opts.controller, opts.meport)) 
     49    rc = client.MultiEngineController((opts.controller, opts.meport)) 
    5050    tc = client.TaskController((opts.controller, opts.tport)) 
    5151     
  • ipython1/branches/ipython1-client-r3021/ipython1/kernel/asynclient.py

    r3027 r3046  
    1717    return me.IFullSynchronousTwoPhaseMultiEngine(smultiengine) 
    1818 
    19 AsynRemoteController = AsynMultiEngineController 
    20  
    2119defaultAddress = (co['client']['connectToMultiEngineControllerOn']['ip'], 
    2220    co['client']['connectToMultiEngineControllerOn']['port']) 
    2321"""The (ip,port) tuple of the default MultiEngineController.""" 
    2422 
     23def AsynTaskController(addr): 
     24    """The default TaskController class obtained from config information.""" 
     25    _task_controller = kernelConfigManager._import(co['client']['TaskControllerImplementation']) 
     26    return _task_controller(addr) 
    2527 
     28defaultTaskAddress = (co['client']['connectToTaskControllerOn']['ip'], 
     29    co['client']['connectToTaskControllerOn']['port']) 
     30"""The (ip,port) tuple of the default task controller.""" 
     31 
     32defaultTaskController = defaultTaskAddress 
  • ipython1/branches/ipython1-client-r3021/ipython1/kernel/client.py

    r3044 r3046  
    66import ipython1.kernel.magic 
    77from ipython1.kernel.multiengineclient import IFullBlockingMultiEngineClient 
     8from ipython1.kernel.taskclient import IBlockingTaskClient 
     9from ipython1.kernel.task import Task, Dependency 
    810from ipython1.kernel.twistedutil import ReactorInThread 
    911from ipython1.kernel.config import configManager as kernelConfigManager 
     
    2931defaultRemoteController = defaultAddress 
    3032 
    31 from ipython1.kernel.task import Task, Dependency 
    3233 
    33 TaskController = kernelConfigManager._import(co['client']['TaskController']) 
    34 """The default TaskController class obtained from config information.""" 
     34 
     35def TaskController(addr): 
     36    """The default TaskController class obtained from config information.""" 
     37    _task_controller = kernelConfigManager._import(co['client']['TaskControllerImplementation']) 
     38    return IBlockingTaskClient(_task_controller(addr)) 
    3539 
    3640defaultTaskAddress = (co['client']['connectToTaskControllerOn']['ip'], 
     
    4044defaultTaskController = defaultTaskAddress 
    4145 
     46 
     47 
    4248rit = ReactorInThread() 
    4349rit.setDaemon(True) 
  • ipython1/branches/ipython1-client-r3021/ipython1/kernel/config/__init__.py

    r3043 r3046  
    109109    'MultiEngineImplementation': 'ipython1.kernel.multienginexmlrpc.XMLRPCSynchronousMultiEngineClient', 
    110110    'connectToMultiEngineControllerOn': {'ip': '127.0.0.1', 'port': xmlrpcMEPort}, 
    111     'TaskController': 'ipython1.kernel.taskxmlrpc.XMLRPCInteractiveTaskClient', 
     111    'TaskControllerImplementation': 'ipython1.kernel.taskxmlrpc.XMLRPCTaskClient', 
    112112    'connectToTaskControllerOn': {'ip': '127.0.0.1', 'port': xmlrpcTCPort} 
    113113} 
  • ipython1/branches/ipython1-client-r3021/ipython1/kernel/multienginexmlrpc.py

    r3043 r3046  
    358358    def cleanOutDeferreds(self): 
    359359        d = self._proxy.callRemote('cleanOutDeferreds', deferredID, block) 
    360         d.addCallback(self.unpackage)         
     360        d.addCallback(self.unpackage) 
     361        return d        
    361362     
    362363    def _addDeferredIDCallback(self, did, callback, *args, **kwargs): 
  • ipython1/branches/ipython1-client-r3021/ipython1/kernel/task.py

    r3036 r3046  
    575575        """ 
    576576     
    577     def getTaskResult(taskID): 
     577    def getTaskResult(taskID, block=False): 
    578578        """Get the result of a task by its ID. 
    579579         
  • ipython1/branches/ipython1-client-r3021/ipython1/kernel/taskclient.py

    r2837 r3046  
    1818#------------------------------------------------------------------------------- 
    1919 
    20 # from twisted.python import failure 
     20from zope.interface import Interface, implements 
     21from twisted.python import components, log 
    2122 
    22 from ipython1.kernel import task 
     23from ipython1.kernel.twistedutil import blockingCallFromThread 
     24from ipython1.kernel import task, error 
    2325 
    2426#------------------------------------------------------------------------------- 
     
    2729 
    2830class InteractiveTaskClient(object): 
    29     """XML-RPC version of the Connecting TaskControllerClient""" 
    3031     
    31     ############ 
    32     # ConnectingTaskController 
    33     ############ 
    3432    def irun(self, *args, **kwargs): 
    3533        """Run a task on the `TaskController`. 
     
    6159        :Returns: A `TaskResult` object.       
    6260        """ 
    63         block = kwargs.pop('block', self.block
     61        block = kwargs.pop('block', False
    6462        if len(args) == 1 and isinstance(args[0], task.Task): 
    6563            t = args[0] 
     
    6866        taskID = self.run(t) 
    6967        print "TaskID = %i"%taskID 
    70         return self.getTaskResult(taskID, block) 
     68        if block: 
     69            return self.getTaskResult(taskID, block) 
     70        else: 
     71            return taskID 
     72 
     73class IBlockingTaskClient(Interface): 
     74    pass 
     75 
     76 
     77class BlockingTaskClient(InteractiveTaskClient): 
     78     
     79    implements(IBlockingTaskClient) 
     80     
     81    def __init__(self, task_controller): 
     82        self.task_controller = task_controller 
     83        self.block = True 
     84         
     85    def run(self, task): 
     86        return blockingCallFromThread(self.task_controller.run, task) 
     87     
     88    def getTaskResult(self, taskID, block=False): 
     89        return blockingCallFromThread(self.task_controller.getTaskResult, 
     90            taskID, block) 
     91     
     92    def abort(self, taskID): 
     93        return blockingCallFromThread(self.task_controller.abort, taskID) 
     94     
     95    def barrier(self, taskIDs): 
     96        return blockingCallFromThread(self.task_controller.barrier, taskIDs) 
     97     
     98    def spin(self): 
     99        return blockingCallFromThread(self.task_controller.spin) 
     100 
     101components.registerAdapter(BlockingTaskClient, 
     102            task.ITaskController, IBlockingTaskClient) 
     103 
     104 
  • ipython1/branches/ipython1-client-r3021/ipython1/kernel/taskxmlrpc.py

    r2837 r3046  
    2727from twisted.internet import defer 
    2828from twisted.python import components, failure 
     29from twisted.web import xmlrpc as webxmlrpc 
    2930 
    3031from ipython1.external.twisted.web2 import xmlrpc, server, channel 
     
    149150#------------------------------------------------------------------------------- 
    150151 
     152class IXMLRPCTaskClient(Interface): 
     153    pass 
     154 
    151155class XMLRPCTaskClient(object): 
    152156    """XML-RPC based TaskController client that implements ITaskController. 
     
    156160            The ip (str) and port (int) tuple of the `TaskController`.   
    157161    """ 
    158     implements(Task.ITaskController) 
    159      
    160     #--------------------------------------------------------------------------- 
    161     # Begin copy from XMLRPCMultiEngineClient 
    162     # Should these methods be in a base XMLRPCClient class? 
    163     #--------------------------------------------------------------------------- 
    164      
     162    implements(Task.ITaskController, IXMLRPCTaskClient) 
     163         
    165164    def __init__(self, addr): 
    166165        self.addr = addr 
    167166        self.url = 'http://%s:%s/' % self.addr 
    168         self._server = xmlrpclib.ServerProxy(self.url, transport=Transport(),  
    169             verbose=0) 
    170         self.block = True 
     167        self._proxy = webxmlrpc.Proxy(self.url) 
    171168     
    172169    #--------------------------------------------------------------------------- 
     
    174171    #--------------------------------------------------------------------------- 
    175172         
    176     def _reallyBlock(self, block=None): 
    177         if block is None: 
    178             return self.block 
    179         else: 
    180             if block in (True, False): 
    181                 return block 
    182             else: 
    183                 raise ValueError("block must be True or False") 
    184      
    185     def _executeRemoteMethod(self, f, *args): 
    186         rawResult = f(*args) 
    187         result = self._unpackageResult(rawResult) 
    188         return result 
    189      
    190     def _unpackageResult(self, result): 
    191         result = pickle.loads(result.data) 
    192         return self._returnOrRaise(result) 
    193      
    194     def _returnOrRaise(self, result): 
    195         if isinstance(result, failure.Failure): 
    196             result.raiseException() 
    197         else: 
    198             return result 
     173    def unpackage(self, r): 
     174        return pickle.loads(r.data) 
    199175       
    200176    #--------------------------------------------------------------------------- 
     
    238214        assert isinstance(task, Task.Task), "task must be a Task object!" 
    239215        binTask = xmlrpc.Binary(pickle.dumps(task,2)) 
    240         result = self._executeRemoteMethod(self._server.run, binTask) 
    241         return result 
    242      
    243     def getTaskResult(self, taskID, block=None): 
     216        d = self._proxy.callRemote('run', binTask) 
     217        d.addCallback(self.unpackage) 
     218        return d 
     219     
     220    def getTaskResult(self, taskID, block=False): 
    244221        """The task result by taskID. 
    245222         
     
    252229        :Returns: A `TaskResult` object that encapsulates the task result. 
    253230        """ 
    254         localBlock = self._reallyBlock(block) 
    255         result = self._executeRemoteMethod(self._server.getTaskResult, taskID, localBlock
    256         return result 
     231        d = self._proxy.callRemote('getTaskResult', taskID, block) 
     232        d.addCallback(self.unpackage
     233        return d  
    257234     
    258235    def abort(self, taskID): 
     
    265242                Should I block until the task is aborted.         
    266243        """ 
    267         result = self._executeRemoteMethod(self._server.abort, taskID) 
    268         return result 
     244        d = self._proxy.callRemote('abort', taskID) 
     245        d.addCallback(self.unpackage) 
     246        return d  
    269247         
    270248    def barrier(self, taskIDs): 
     
    275253                A sequence of taskIDs to block on. 
    276254        """ 
    277         result = self._executeRemoteMethod(self._server.barrier, taskIDs) 
    278         return result 
     255        d = self._proxy.callRemote('barrier', taskIDs) 
     256        d.addCallback(self.unpackage) 
     257        return d  
    279258     
    280259    def spin(self): 
     
    282261        a task. 
    283262        """ 
    284         result = self._executeRemoteMethod(self._server.spin) 
    285  
    286  
    287 components.registerAdapter(XMLRPCTaskClient,  
    288         xmlrpclib.ServerProxy, Task.ITaskController) 
    289      
    290  
    291 class XMLRPCInteractiveTaskClient(XMLRPCTaskClient, taskclient.InteractiveTaskClient): 
    292     pass 
    293  
     263        d = self._proxy.callRemote('spin') 
     264        d.addCallback(self.unpackage) 
     265        return d 
     266 
     267 
     268