Changeset 3046
- Timestamp:
- 02/12/08 23:19:32 (10 months ago)
- Files:
-
- ipython1/branches/ipython1-client-r3021/TODO (modified) (1 diff)
- ipython1/branches/ipython1-client-r3021/docs/examples/task_profiler.py (modified) (1 diff)
- ipython1/branches/ipython1-client-r3021/ipython1/kernel/asynclient.py (modified) (1 diff)
- ipython1/branches/ipython1-client-r3021/ipython1/kernel/client.py (modified) (3 diffs)
- ipython1/branches/ipython1-client-r3021/ipython1/kernel/config/__init__.py (modified) (1 diff)
- ipython1/branches/ipython1-client-r3021/ipython1/kernel/multienginexmlrpc.py (modified) (1 diff)
- ipython1/branches/ipython1-client-r3021/ipython1/kernel/task.py (modified) (1 diff)
- ipython1/branches/ipython1-client-r3021/ipython1/kernel/taskclient.py (modified) (4 diffs)
- ipython1/branches/ipython1-client-r3021/ipython1/kernel/taskxmlrpc.py (modified) (9 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
ipython1/branches/ipython1-client-r3021/TODO
r3012 r3046 5 5 Actions: 6 6 7 * Move all the IPython1 documentation to REST text files. 8 * Refactor the exceptions in xmlrpcutil? 7 * Decide if it should be barrier(*args) or barrier(args) 8 * Write test_taskxmlrpc 9 * Write tests for Properties in multiengine 10 * Work on Task interfaces 11 * Change coding conventions of interfaces 12 * Update ChangeLog describing new APIs 13 * Update documentation 9 14 ipython1/branches/ipython1-client-r3021/docs/examples/task_profiler.py
r3043 r3046 47 47 assert opts.tmax >= opts.tmin, "tmax must not be smaller than tmin" 48 48 49 rc = client. RemoteController((opts.controller, opts.meport))49 rc = client.MultiEngineController((opts.controller, opts.meport)) 50 50 tc = client.TaskController((opts.controller, opts.tport)) 51 51 ipython1/branches/ipython1-client-r3021/ipython1/kernel/asynclient.py
r3027 r3046 17 17 return me.IFullSynchronousTwoPhaseMultiEngine(smultiengine) 18 18 19 AsynRemoteController = AsynMultiEngineController20 21 19 defaultAddress = (co['client']['connectToMultiEngineControllerOn']['ip'], 22 20 co['client']['connectToMultiEngineControllerOn']['port']) 23 21 """The (ip,port) tuple of the default MultiEngineController.""" 24 22 23 def AsynTaskController(addr): 24 """The default TaskController class obtained from config information.""" 25 _task_controller = kernelConfigManager._import(co['client']['TaskControllerImplementation']) 26 return _task_controller(addr) 25 27 28 defaultTaskAddress = (co['client']['connectToTaskControllerOn']['ip'], 29 co['client']['connectToTaskControllerOn']['port']) 30 """The (ip,port) tuple of the default task controller.""" 31 32 defaultTaskController = defaultTaskAddress ipython1/branches/ipython1-client-r3021/ipython1/kernel/client.py
r3044 r3046 6 6 import ipython1.kernel.magic 7 7 from ipython1.kernel.multiengineclient import IFullBlockingMultiEngineClient 8 from ipython1.kernel.taskclient import IBlockingTaskClient 9 from ipython1.kernel.task import Task, Dependency 8 10 from ipython1.kernel.twistedutil import ReactorInThread 9 11 from ipython1.kernel.config import configManager as kernelConfigManager … … 29 31 defaultRemoteController = defaultAddress 30 32 31 from ipython1.kernel.task import Task, Dependency32 33 33 TaskController = kernelConfigManager._import(co['client']['TaskController']) 34 """The default TaskController class obtained from config information.""" 34 35 def TaskController(addr): 36 """The default TaskController class obtained from config information.""" 37 _task_controller = kernelConfigManager._import(co['client']['TaskControllerImplementation']) 38 return IBlockingTaskClient(_task_controller(addr)) 35 39 36 40 defaultTaskAddress = (co['client']['connectToTaskControllerOn']['ip'], … … 40 44 defaultTaskController = defaultTaskAddress 41 45 46 47 42 48 rit = ReactorInThread() 43 49 rit.setDaemon(True) ipython1/branches/ipython1-client-r3021/ipython1/kernel/config/__init__.py
r3043 r3046 109 109 'MultiEngineImplementation': 'ipython1.kernel.multienginexmlrpc.XMLRPCSynchronousMultiEngineClient', 110 110 'connectToMultiEngineControllerOn': {'ip': '127.0.0.1', 'port': xmlrpcMEPort}, 111 'TaskController ': 'ipython1.kernel.taskxmlrpc.XMLRPCInteractiveTaskClient',111 'TaskControllerImplementation': 'ipython1.kernel.taskxmlrpc.XMLRPCTaskClient', 112 112 'connectToTaskControllerOn': {'ip': '127.0.0.1', 'port': xmlrpcTCPort} 113 113 } ipython1/branches/ipython1-client-r3021/ipython1/kernel/multienginexmlrpc.py
r3043 r3046 358 358 def cleanOutDeferreds(self): 359 359 d = self._proxy.callRemote('cleanOutDeferreds', deferredID, block) 360 d.addCallback(self.unpackage) 360 d.addCallback(self.unpackage) 361 return d 361 362 362 363 def _addDeferredIDCallback(self, did, callback, *args, **kwargs): ipython1/branches/ipython1-client-r3021/ipython1/kernel/task.py
r3036 r3046 575 575 """ 576 576 577 def getTaskResult(taskID ):577 def getTaskResult(taskID, block=False): 578 578 """Get the result of a task by its ID. 579 579 ipython1/branches/ipython1-client-r3021/ipython1/kernel/taskclient.py
r2837 r3046 18 18 #------------------------------------------------------------------------------- 19 19 20 # from twisted.python import failure 20 from zope.interface import Interface, implements 21 from twisted.python import components, log 21 22 22 from ipython1.kernel import task 23 from ipython1.kernel.twistedutil import blockingCallFromThread 24 from ipython1.kernel import task, error 23 25 24 26 #------------------------------------------------------------------------------- … … 27 29 28 30 class InteractiveTaskClient(object): 29 """XML-RPC version of the Connecting TaskControllerClient"""30 31 31 ############32 # ConnectingTaskController33 ############34 32 def irun(self, *args, **kwargs): 35 33 """Run a task on the `TaskController`. … … 61 59 :Returns: A `TaskResult` object. 62 60 """ 63 block = kwargs.pop('block', self.block)61 block = kwargs.pop('block', False) 64 62 if len(args) == 1 and isinstance(args[0], task.Task): 65 63 t = args[0] … … 68 66 taskID = self.run(t) 69 67 print "TaskID = %i"%taskID 70 return self.getTaskResult(taskID, block) 68 if block: 69 return self.getTaskResult(taskID, block) 70 else: 71 return taskID 72 73 class IBlockingTaskClient(Interface): 74 pass 75 76 77 class BlockingTaskClient(InteractiveTaskClient): 78 79 implements(IBlockingTaskClient) 80 81 def __init__(self, task_controller): 82 self.task_controller = task_controller 83 self.block = True 84 85 def run(self, task): 86 return blockingCallFromThread(self.task_controller.run, task) 87 88 def getTaskResult(self, taskID, block=False): 89 return blockingCallFromThread(self.task_controller.getTaskResult, 90 taskID, block) 91 92 def abort(self, taskID): 93 return blockingCallFromThread(self.task_controller.abort, taskID) 94 95 def barrier(self, taskIDs): 96 return blockingCallFromThread(self.task_controller.barrier, taskIDs) 97 98 def spin(self): 99 return blockingCallFromThread(self.task_controller.spin) 100 101 components.registerAdapter(BlockingTaskClient, 102 task.ITaskController, IBlockingTaskClient) 103 104 ipython1/branches/ipython1-client-r3021/ipython1/kernel/taskxmlrpc.py
r2837 r3046 27 27 from twisted.internet import defer 28 28 from twisted.python import components, failure 29 from twisted.web import xmlrpc as webxmlrpc 29 30 30 31 from ipython1.external.twisted.web2 import xmlrpc, server, channel … … 149 150 #------------------------------------------------------------------------------- 150 151 152 class IXMLRPCTaskClient(Interface): 153 pass 154 151 155 class XMLRPCTaskClient(object): 152 156 """XML-RPC based TaskController client that implements ITaskController. … … 156 160 The ip (str) and port (int) tuple of the `TaskController`. 157 161 """ 158 implements(Task.ITaskController) 159 160 #--------------------------------------------------------------------------- 161 # Begin copy from XMLRPCMultiEngineClient 162 # Should these methods be in a base XMLRPCClient class? 163 #--------------------------------------------------------------------------- 164 162 implements(Task.ITaskController, IXMLRPCTaskClient) 163 165 164 def __init__(self, addr): 166 165 self.addr = addr 167 166 self.url = 'http://%s:%s/' % self.addr 168 self._server = xmlrpclib.ServerProxy(self.url, transport=Transport(), 169 verbose=0) 170 self.block = True 167 self._proxy = webxmlrpc.Proxy(self.url) 171 168 172 169 #--------------------------------------------------------------------------- … … 174 171 #--------------------------------------------------------------------------- 175 172 176 def _reallyBlock(self, block=None): 177 if block is None: 178 return self.block 179 else: 180 if block in (True, False): 181 return block 182 else: 183 raise ValueError("block must be True or False") 184 185 def _executeRemoteMethod(self, f, *args): 186 rawResult = f(*args) 187 result = self._unpackageResult(rawResult) 188 return result 189 190 def _unpackageResult(self, result): 191 result = pickle.loads(result.data) 192 return self._returnOrRaise(result) 193 194 def _returnOrRaise(self, result): 195 if isinstance(result, failure.Failure): 196 result.raiseException() 197 else: 198 return result 173 def unpackage(self, r): 174 return pickle.loads(r.data) 199 175 200 176 #--------------------------------------------------------------------------- … … 238 214 assert isinstance(task, Task.Task), "task must be a Task object!" 239 215 binTask = xmlrpc.Binary(pickle.dumps(task,2)) 240 result = self._executeRemoteMethod(self._server.run, binTask) 241 return result 242 243 def getTaskResult(self, taskID, block=None): 216 d = self._proxy.callRemote('run', binTask) 217 d.addCallback(self.unpackage) 218 return d 219 220 def getTaskResult(self, taskID, block=False): 244 221 """The task result by taskID. 245 222 … … 252 229 :Returns: A `TaskResult` object that encapsulates the task result. 253 230 """ 254 localBlock = self._reallyBlock(block)255 result = self._executeRemoteMethod(self._server.getTaskResult, taskID, localBlock)256 return result231 d = self._proxy.callRemote('getTaskResult', taskID, block) 232 d.addCallback(self.unpackage) 233 return d 257 234 258 235 def abort(self, taskID): … … 265 242 Should I block until the task is aborted. 266 243 """ 267 result = self._executeRemoteMethod(self._server.abort, taskID) 268 return result 244 d = self._proxy.callRemote('abort', taskID) 245 d.addCallback(self.unpackage) 246 return d 269 247 270 248 def barrier(self, taskIDs): … … 275 253 A sequence of taskIDs to block on. 276 254 """ 277 result = self._executeRemoteMethod(self._server.barrier, taskIDs) 278 return result 255 d = self._proxy.callRemote('barrier', taskIDs) 256 d.addCallback(self.unpackage) 257 return d 279 258 280 259 def spin(self): … … 282 261 a task. 283 262 """ 284 result = self._executeRemoteMethod(self._server.spin) 285 286 287 components.registerAdapter(XMLRPCTaskClient, 288 xmlrpclib.ServerProxy, Task.ITaskController) 289 290 291 class XMLRPCInteractiveTaskClient(XMLRPCTaskClient, taskclient.InteractiveTaskClient): 292 pass 293 263 d = self._proxy.callRemote('spin') 264 d.addCallback(self.unpackage) 265 return d 266 267 268
