Changeset 3036
- Timestamp:
- 02/11/08 18:03:51 (10 months ago)
- Files:
-
- ipython1/branches/ipython1-client-r3021/ipython1/core/interpreter.py (modified) (1 diff)
- ipython1/branches/ipython1-client-r3021/ipython1/kernel/multiengine.py (modified) (1 diff)
- ipython1/branches/ipython1-client-r3021/ipython1/kernel/multienginexmlrpc.py (modified) (6 diffs)
- ipython1/branches/ipython1-client-r3021/ipython1/kernel/pendingdeferred.py (modified) (7 diffs)
- ipython1/branches/ipython1-client-r3021/ipython1/kernel/task.py (modified) (1 diff)
- ipython1/branches/ipython1-client-r3021/ipython1/kernel/tests/multienginetest.py (modified) (15 diffs)
- ipython1/branches/ipython1-client-r3021/ipython1/kernel/tests/test_multienginexmlrpc.py (modified) (3 diffs)
- ipython1/branches/ipython1-client-r3021/ipython1/kernel/tests/test_pendingdeferred.py (modified) (3 diffs)
- ipython1/branches/ipython1-client-r3021/ipython1/kernel/tests/test_task.py (modified) (3 diffs)
- ipython1/branches/ipython1-client-r3021/ipython1/kernel/tests/tgenerator.py (modified) (3 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
ipython1/branches/ipython1-client-r3021/ipython1/core/interpreter.py
r3028 r3036 545 545 return result 546 546 547 def pullFunction(self, key ):548 return self.pull(key )547 def pullFunction(self, keys): 548 return self.pull(keys) 549 549 550 550 #### Interactive user API ################################################## ipython1/branches/ipython1-client-r3021/ipython1/kernel/multiengine.py
r3031 r3036 678 678 679 679 def getResult(self, i=None, targets='all'): 680 return self._submitThenBlock(' pullSerialized', i, targets)680 return self._submitThenBlock('getResult', i, targets) 681 681 682 682 def reset(self, targets='all'): ipython1/branches/ipython1-client-r3021/ipython1/kernel/multienginexmlrpc.py
r3031 r3036 114 114 log.msg("Adapting: %r"%multiengine) 115 115 self.smultiengine = ISynchronousMultiEngine(multiengine) 116 self._deferredIDCallbacks = {} 116 117 117 118 #--------------------------------------------------------------------------- … … 133 134 @packageResult 134 135 def xmlrpc_getPendingDeferred(self, request, deferredID, block): 135 return self.smultiengine.getPendingDeferred(deferredID, block) 136 d = self.smultiengine.getPendingDeferred(deferredID, block) 137 try: 138 callback = self._deferredIDCallbacks.pop(deferredID) 139 except KeyError: 140 callback = None 141 if callback is not None: 142 d.addCallback(callback[0], *callback[1], **callback[2]) 143 return d 136 144 145 def _addDeferredIDCallback(self, did, callback, *args, **kwargs): 146 self._deferredIDCallbacks[did] = (callback, args, kwargs) 147 return did 148 137 149 #--------------------------------------------------------------------------- 138 150 # IEngineMultiplexer related methods … … 174 186 @packageResult 175 187 def xmlrpc_pullFunction(self, request, keys, targets, block): 188 def can_functions(r, keys): 189 if len(keys)==1 or isinstance(keys, str): 190 result = canSequence(r) 191 elif len(keys)>1: 192 result = [canSequence(s) for s in r] 193 return result 176 194 d = self.smultiengine.pullFunction(keys, targets=targets, block=block) 177 if len(keys)==1:178 d.addCallback(can Sequence)179 el if len(keys)>1:180 d.addCallback( self._canMultipleKeys)195 if block: 196 d.addCallback(can_functions, keys) 197 else: 198 d.addCallback(lambda did: self._addDeferredIDCallback(did, can_functions, keys)) 181 199 return d 182 200 … … 310 328 self.url = 'http://%s:%s/' % self.addr 311 329 self._proxy = webxmlrpc.Proxy(self.url) 330 self._deferredIDCallbacks = {} 312 331 313 332 #--------------------------------------------------------------------------- … … 325 344 d = self._proxy.callRemote('getPendingDeferred', deferredID, block) 326 345 d.addCallback(self.unpackage) 327 return d 328 346 try: 347 callback = self._deferredIDCallbacks.pop(deferredID) 348 except KeyError: 349 callback = None 350 if callback is not None: 351 d.addCallback(callback[0], *callback[1], **callback[2]) 352 return d 353 354 def _addDeferredIDCallback(self, did, callback, *args, **kwargs): 355 self._deferredIDCallbacks[did] = (callback, args, kwargs) 356 return did 357 329 358 #--------------------------------------------------------------------------- 330 359 # IEngineMultiplexer related methods … … 355 384 356 385 def pullFunction(self, keys, targets='all', block=True): 386 def uncan_functions(r, keys): 387 if len(keys)==1 or isinstance(keys, str): 388 return uncanSequence(r) 389 elif len(keys)>1: 390 return [uncanSequence(s) for s in r] 357 391 d = self._proxy.callRemote('pullFunction', keys, targets, block) 358 d.addCallback(self.unpackage) 359 d.addCallback(uncanSequence) 392 if block: 393 d.addCallback(self.unpackage) 394 d.addCallback(uncan_functions, keys) 395 else: 396 d.addCallback(self.unpackage) 397 d.addCallback(lambda did: self._addDeferredIDCallback(did, uncan_functions, keys)) 360 398 return d 361 399 ipython1/branches/ipython1-client-r3021/ipython1/kernel/pendingdeferred.py
r3031 r3036 58 58 return guid.generate() 59 59 60 def savePendingDeferred(self, deferredID, d ):60 def savePendingDeferred(self, deferredID, d, callback=None, arguments=None): 61 61 """Save a deferred to me by deferredID. 62 62 … … 66 66 d : Deferred 67 67 The deferred to save. 68 callback : FunctionType 69 A function to add to the callback of d before returning. 70 arguments: tuple 71 A two tuple of (args, kwargs) to pass to the function 68 72 """ 69 73 … … 71 75 if pd is not None: 72 76 self.removePendingDeferred(deferredID) 73 self.pendingDeferreds[deferredID] = d77 self.pendingDeferreds[deferredID] = (d, callback, arguments) 74 78 75 79 def removePendingDeferred(self, deferredID): … … 81 85 """ 82 86 83 pd = self.pendingDeferreds.get(deferredID) 84 if pd is not None: 87 pd_tuple = self.pendingDeferreds.get(deferredID) 88 if pd_tuple is not None: 89 pd = pd_tuple[0] 85 90 pd.addErrback(lambda f: None) 86 91 del self.pendingDeferreds[deferredID] … … 105 110 """ 106 111 #log.msg("getPendingDeferred: %s %s" % (repr(deferredID), repr(block))) 107 pd = self.pendingDeferreds.get(deferredID) 108 if pd is not None: 112 pd_tuple = self.pendingDeferreds.get(deferredID) 113 if pd_tuple is not None: 114 # Pull out the callback function and it args/kwargs 115 pd = pd_tuple[0] 116 cbfunc = pd_tuple[1] 117 cbfunc_args = pd_tuple[2] 118 if cbfunc_args is not None: 119 cbfunc_posargs = cbfunc_args[0] 120 cbfunc_kwargs = cbfunc_args[1] 121 109 122 if not pd.called and block: # pd has not fired and we should block 110 123 #log.msg("pendingDeferred has not been called: %s" % deferredID) 111 124 pd.addCallback(self._deleteAndPassThrough, deferredID) 112 returnpd125 dToReturn = pd 113 126 elif not pd.called and not block: # pd has not fired, but we should not block 114 127 return defer.fail(failure.Failure(error.ResultNotCompleted("Result not completed: %r" % deferredID))) … … 123 136 # It has fired so remove it! 124 137 self.removePendingDeferred(deferredID) 138 # Register the callback function with its args/kwargs if needed 139 if cbfunc is not None: 140 dToReturn.addCallback(cbfunc, *cbfunc_posargs, **cbfunc_kwargs) 125 141 return dToReturn 126 142 else: … … 158 174 block = kwargs.pop('block') 159 175 except KeyError: 160 block = True 176 block = True # The default if not specified 161 177 if block: 162 178 return wrappedMethod(pendingDeferredManager, *args, **kwargs) ipython1/branches/ipython1-client-r3021/ipython1/kernel/task.py
r3021 r3036 383 383 384 384 if task.setupNS is not None: 385 d.addCallback(lambda r: self.queuedEngine.push( **task.setupNS))385 d.addCallback(lambda r: self.queuedEngine.push(task.setupNS)) 386 386 387 387 d.addCallback(lambda r: self.queuedEngine.execute(task.expression)) 388 388 389 389 if task.resultNames is not None: 390 d.addCallback(lambda r: self.queuedEngine.pull( *task.resultNames))390 d.addCallback(lambda r: self.queuedEngine.pull(task.resultNames)) 391 391 else: 392 392 d.addCallback(lambda r: None) ipython1/branches/ipython1-client-r3021/ipython1/kernel/tests/multienginetest.py
r3032 r3036 23 23 from ipython1.testutils import util 24 24 from ipython1.kernel import newserialized 25 from ipython1.kernel.util import printer 25 26 from ipython1.kernel.error import InvalidEngineID, NoEnginesRegistered 26 27 from ipython1.kernel.tests.engineservicetest import validCommands, invalidCommands 27 from ipython1.kernel.tests.t estgenerator import (MultiEngineExecuteAllTestGenerator,28 from ipython1.kernel.tests.tgenerator import (MultiEngineExecuteAllTestGenerator, 28 29 MultiEngineFailingExecuteTestGenerator, 29 30 MultiEngineGetResultTestGenerator) … … 58 59 return globala*x 59 60 61 def isdid(did): 62 if not isinstance(did, str): 63 return False 64 if not len(did)==40: 65 return False 66 return True 60 67 61 68 … … 69 76 """Does self.engine claim to implement IEngineCore?""" 70 77 self.assert_(me.IEngineMultiplexer.providedBy(self.multiengine)) 71 self.assert_(me.IEngineMultiplexerAll.providedBy(self.multiengine))72 78 self.assert_(me.IMultiEngine.providedBy(self.multiengine)) 73 79 … … 83 89 d.addCallback(lambda _: self.multiengine.push(dict(a=5, b='asdf', c=[1,2,3]),targets=0)) 84 90 d.addCallback(lambda _: self.multiengine.pull(('a','b','c'),targets=0)) 85 d.addCallback(lambda _: self.multiengine.getResult( 0))86 d.addCallback(lambda _: self.multiengine.reset( 0))87 d.addCallback(lambda _: self.multiengine.keys( 0))91 d.addCallback(lambda _: self.multiengine.getResult(targets=0)) 92 d.addCallback(lambda _: self.multiengine.reset(targets=0)) 93 d.addCallback(lambda _: self.multiengine.keys(targets=0)) 88 94 d.addCallback(lambda _: self.multiengine.pushSerialized(dict(a=newserialized.serialize(10)),targets=0)) 89 95 d.addCallback(lambda _: self.multiengine.pullSerialized('a',targets=0)) 90 d.addCallback(lambda _: self.multiengine.clearQueue( 0))91 d.addCallback(lambda _: self.multiengine.queueStatus( 0))96 d.addCallback(lambda _: self.multiengine.clearQueue(targets=0)) 97 d.addCallback(lambda _: self.multiengine.queueStatus(targets=0)) 92 98 return d 93 99 … … 101 107 d.addCallback(lambda _: self.multiengine.pull('a', targets=badID)) 102 108 d.addErrback(lambda f: self.assertRaises(InvalidEngineID, f.raiseException)) 103 d.addCallback(lambda _: self.multiengine.getResult(targets=badID))104 d.addErrback(lambda f: self.assertRaises(InvalidEngineID, f.raiseException))109 # d.addCallback(lambda _: self.multiengine.getResult(targets=badID)) 110 # d.addErrback(lambda f: self.assertRaises(InvalidEngineID, f.raiseException)) 105 111 d.addCallback(lambda _: self.multiengine.reset(targets=badID)) 106 112 d.addErrback(lambda f: self.assertRaises(InvalidEngineID, f.raiseException)) … … 119 125 d = self.multiengine.execute('a=5', targets=badID) 120 126 d.addErrback(lambda f: self.assertRaises(NoEnginesRegistered, f.raiseException)) 121 d.addCallback(lambda _: self.multiengine.push( a=5, targets=badID))127 d.addCallback(lambda _: self.multiengine.push(dict(a=5), targets=badID)) 122 128 d.addErrback(lambda f: self.assertRaises(NoEnginesRegistered, f.raiseException)) 123 129 d.addCallback(lambda _: self.multiengine.pull('a', targets=badID)) … … 170 176 self.addEngine(1) 171 177 objs = [10,"hi there",1.2342354,{"p":(1,2)}] 172 d = self.multiengine.push( 0, key=objs[0])173 d.addCallback(lambda _: self.multiengine.pull( 0, 'key'))178 d = self.multiengine.push(dict(key=objs[0]), targets=0) 179 d.addCallback(lambda _: self.multiengine.pull('key', targets=0)) 174 180 d.addCallback(lambda r: self.assertEquals(r, [objs[0]])) 175 d.addCallback(lambda _: self.multiengine.push( 0, key=objs[1]))176 d.addCallback(lambda _: self.multiengine.pull( 0, 'key'))181 d.addCallback(lambda _: self.multiengine.push(dict(key=objs[1]), targets=0)) 182 d.addCallback(lambda _: self.multiengine.pull('key', targets=0)) 177 183 d.addCallback(lambda r: self.assertEquals(r, [objs[1]])) 178 d.addCallback(lambda _: self.multiengine.push( 0, key=objs[2]))179 d.addCallback(lambda _: self.multiengine.pull( 0, 'key'))184 d.addCallback(lambda _: self.multiengine.push(dict(key=objs[2]), targets=0)) 185 d.addCallback(lambda _: self.multiengine.pull('key', targets=0)) 180 186 d.addCallback(lambda r: self.assertEquals(r, [objs[2]])) 181 d.addCallback(lambda _: self.multiengine.push( 0, key=objs[3]))182 d.addCallback(lambda _: self.multiengine.pull( 0, 'key'))187 d.addCallback(lambda _: self.multiengine.push(dict(key=objs[3]), targets=0)) 188 d.addCallback(lambda _: self.multiengine.pull('key', targets=0)) 183 189 d.addCallback(lambda r: self.assertEquals(r, [objs[3]])) 184 d.addCallback(lambda _: self.multiengine.reset( 0))185 d.addCallback(lambda _: self.multiengine.pull( 0, 'a'))190 d.addCallback(lambda _: self.multiengine.reset(targets=0)) 191 d.addCallback(lambda _: self.multiengine.pull('a', targets=0)) 186 192 d.addErrback(lambda f: self.assertRaises(NameError, f.raiseException)) 187 193 return d … … 189 195 def testPushPullAll(self): 190 196 self.addEngine(4) 191 d = self.multiengine.push All(a=10)192 d.addCallback(lambda _: self.multiengine.pull All('a'))197 d = self.multiengine.push(dict(a=10)) 198 d.addCallback(lambda _: self.multiengine.pull('a')) 193 199 d.addCallback(lambda r: self.assert_(r==[10,10,10,10])) 194 d.addCallback(lambda _: self.multiengine.push All(a=10, b=20))195 d.addCallback(lambda _: self.multiengine.pull All('a','b'))200 d.addCallback(lambda _: self.multiengine.push(dict(a=10, b=20))) 201 d.addCallback(lambda _: self.multiengine.pull(('a','b'))) 196 202 d.addCallback(lambda r: self.assert_(r==4*[[10,20]])) 197 d.addCallback(lambda _: self.multiengine.push( 0, a=10, b=20))198 d.addCallback(lambda _: self.multiengine.pull( 0,'a','b'))203 d.addCallback(lambda _: self.multiengine.push(dict(a=10, b=20), targets=0)) 204 d.addCallback(lambda _: self.multiengine.pull(('a','b'), targets=0)) 199 205 d.addCallback(lambda r: self.assert_(r==[[10,20]])) 200 d.addCallback(lambda _: self.multiengine.push( 0, a=None, b=None))201 d.addCallback(lambda _: self.multiengine.pull( 0, 'a','b'))206 d.addCallback(lambda _: self.multiengine.push(dict(a=None, b=None), targets=0)) 207 d.addCallback(lambda _: self.multiengine.pull(('a','b'), targets=0)) 202 208 d.addCallback(lambda r: self.assert_(r==[[None,None]])) 203 209 return d … … 206 212 self.addEngine(1) 207 213 objs = [10,"hi there",1.2342354,{"p":(1,2)}] 208 d = self.multiengine.pushSerialized( 0, key=newserialized.serialize(objs[0]))209 d.addCallback(lambda _: self.multiengine.pullSerialized( 0, 'key'))214 d = self.multiengine.pushSerialized(dict(key=newserialized.serialize(objs[0])), targets=0) 215 d.addCallback(lambda _: self.multiengine.pullSerialized('key', targets=0)) 210 216 d.addCallback(lambda serial: newserialized.IUnSerialized(serial[0]).getObject()) 211 217 d.addCallback(lambda r: self.assertEquals(r, objs[0])) 212 d.addCallback(lambda _: self.multiengine.pushSerialized( 0, key=newserialized.serialize(objs[1])))213 d.addCallback(lambda _: self.multiengine.pullSerialized( 0, 'key'))218 d.addCallback(lambda _: self.multiengine.pushSerialized(dict(key=newserialized.serialize(objs[1])), targets=0)) 219 d.addCallback(lambda _: self.multiengine.pullSerialized('key', targets=0)) 214 220 d.addCallback(lambda serial: newserialized.IUnSerialized(serial[0]).getObject()) 215 221 d.addCallback(lambda r: self.assertEquals(r, objs[1])) 216 d.addCallback(lambda _: self.multiengine.pushSerialized( 0, key=newserialized.serialize(objs[2])))217 d.addCallback(lambda _: self.multiengine.pullSerialized( 0, 'key'))222 d.addCallback(lambda _: self.multiengine.pushSerialized(dict(key=newserialized.serialize(objs[2])), targets=0)) 223 d.addCallback(lambda _: self.multiengine.pullSerialized('key', targets=0)) 218 224 d.addCallback(lambda serial: newserialized.IUnSerialized(serial[0]).getObject()) 219 225 d.addCallback(lambda r: self.assertEquals(r, objs[2])) 220 d.addCallback(lambda _: self.multiengine.pushSerialized( 0, key=newserialized.serialize(objs[3])))221 d.addCallback(lambda _: self.multiengine.pullSerialized( 0, 'key'))226 d.addCallback(lambda _: self.multiengine.pushSerialized(dict(key=newserialized.serialize(objs[3])), targets=0)) 227 d.addCallback(lambda _: self.multiengine.pullSerialized('key', targets=0)) 222 228 d.addCallback(lambda serial: newserialized.IUnSerialized(serial[0]).getObject()) 223 229 d.addCallback(lambda r: self.assertEquals(r, objs[3])) 224 d.addCallback(lambda _: self.multiengine.reset( 0))225 d.addCallback(lambda _: self.multiengine.pullSerialized( 0, 'a'))230 d.addCallback(lambda _: self.multiengine.reset(targets=0)) 231 d.addCallback(lambda _: self.multiengine.pullSerialized('a', targets=0)) 226 232 d.addErrback(lambda f: self.assertRaises(NameError, f.raiseException)) 227 233 return d … … 252 258 dikt.pop(key) 253 259 return dikt 254 d = self.multiengine.execute( target, cmd)255 d.addCallback(lambda _: self.multiengine.getResult(target ))260 d = self.multiengine.execute(cmd, targets=target) 261 d.addCallback(lambda _: self.multiengine.getResult(targets=target)) 256 262 d.addCallback(lambda r: self.assertEquals(shellResult, popit(r[0],'id'))) 257 263 return d … … 259 265 def testGetResultFailure(self): 260 266 self.addEngine(1) 261 d = self.multiengine.getResult( 0, None)267 d = self.multiengine.getResult(None, targets=0) 262 268 d.addErrback(lambda f: self.assertRaises(IndexError, f.raiseException)) 263 d.addCallback(lambda _: self.multiengine.getResult( 0, 10))269 d.addCallback(lambda _: self.multiengine.getResult(10, targets=0)) 264 270 d.addErrback(lambda f: self.assertRaises(IndexError, f.raiseException)) 265 271 return d … … 267 273 def testPushFunction(self): 268 274 self.addEngine(1) 269 d = self.multiengine.pushFunction( 0,f=testf)270 d.addCallback(lambda _: self.multiengine.execute( 0,'result = f(10)'))271 d.addCallback(lambda _: self.multiengine.pull( 0, 'result'))275 d = self.multiengine.pushFunction(dict(f=testf), targets=0) 276 d.addCallback(lambda _: self.multiengine.execute('result = f(10)', targets=0)) 277 d.addCallback(lambda _: self.multiengine.pull('result', targets=0)) 272 278 d.addCallback(lambda r: self.assertEquals(r[0], testf(10))) 273 d.addCallback(lambda _: self.multiengine.push( 0,globala=globala))274 d.addCallback(lambda _: self.multiengine.pushFunction( 0,g=testg))275 d.addCallback(lambda _: self.multiengine.execute( 0,'result = g(10)'))276 d.addCallback(lambda _: self.multiengine.pull( 0, 'result'))279 d.addCallback(lambda _: self.multiengine.push(dict(globala=globala), targets=0)) 280 d.addCallback(lambda _: self.multiengine.pushFunction(dict(g=testg), targets=0)) 281 d.addCallback(lambda _: self.multiengine.execute('result = g(10)', targets=0)) 282 d.addCallback(lambda _: self.multiengine.pull('result', targets=0)) 277 283 d.addCallback(lambda r: self.assertEquals(r[0], testg(10))) 278 284 return d … … 280 286 def testPullFunction(self): 281 287 self.addEngine(1) 282 d = self.multiengine.push( 0,a=globala)283 d.addCallback(lambda _: self.multiengine.pushFunction( 0,f=testf))284 d.addCallback(lambda _: self.multiengine.pullFunction( 0, 'f'))288 d = self.multiengine.push(dict(a=globala), targets=0) 289 d.addCallback(lambda _: self.multiengine.pushFunction(dict(f=testf), targets=0)) 290 d.addCallback(lambda _: self.multiengine.pullFunction('f', targets=0)) 285 291 d.addCallback(lambda r: self.assertEquals(r[0](10), testf(10))) 292 d.addCallback(lambda _: self.multiengine.execute("def g(x): return x*x", targets=0)) 293 d.addCallback(lambda _: self.multiengine.pullFunction(('f','g'),targets=0)) 294 d.addCallback(lambda r: self.assertEquals((r[0][0](10),r[0][1](10)), (testf(10), 100))) 286 295 return d 287 296 288 297 def testPushFunctionAll(self): 289 298 self.addEngine(4) 290 d = self.multiengine.pushFunction All(f=testf)291 d.addCallback(lambda _: self.multiengine.execute All('result = f(10)'))292 d.addCallback(lambda _: self.multiengine.pull All('result'))299 d = self.multiengine.pushFunction(dict(f=testf)) 300 d.addCallback(lambda _: self.multiengine.execute('result = f(10)')) 301 d.addCallback(lambda _: self.multiengine.pull('result')) 293 302 d.addCallback(lambda r: self.assertEquals(r, 4*[testf(10)])) 294 d.addCallback(lambda _: self.multiengine.push All(globala=globala))295 d.addCallback(lambda _: self.multiengine.pushFunction All(testg=testg))296 d.addCallback(lambda _: self.multiengine.execute All('result = testg(10)'))297 d.addCallback(lambda _: self.multiengine.pull All('result'))303 d.addCallback(lambda _: self.multiengine.push(dict(globala=globala))) 304 d.addCallback(lambda _: self.multiengine.pushFunction(dict(testg=testg))) 305 d.addCallback(lambda _: self.multiengine.execute('result = testg(10)')) 306 d.addCallback(lambda _: self.multiengine.pull('result')) 298 307 d.addCallback(lambda r: self.assertEquals(r, 4*[testg(10)])) 299 308 return d … … 301 310 def testPullFunctionAll(self): 302 311 self.addEngine(4) 303 d = self.multiengine.pushFunction All(f=testf)304 d.addCallback(lambda _: self.multiengine.pullFunction All('f'))312 d = self.multiengine.pushFunction(dict(f=testf)) 313 d.addCallback(lambda _: self.multiengine.pullFunction('f')) 305 314 d.addCallback(lambda r: self.assertEquals([func(10) for func in r], 4*[testf(10)])) 306 315 return d … … 309 318 class ISynchronousMultiEngineTestCase(IMultiEngineBaseTestCase): 310 319 311 def testI MultiEngineInterface(self):320 def testISynchronousMultiEngineInterface(self): 312 321 """Does self.engine claim to implement IEngineCore?""" 313 322 self.assert_(me.ISynchronousEngineMultiplexer.providedBy(self.multiengine)) 314 self.assert_(me.ISynchronousEngineMultiplexerAll.providedBy(self.multiengine))315 323 self.assert_(me.ISynchronousMultiEngine.providedBy(self.multiengine)) 316 324 317 325 def testExecute(self): 318 326 self.addEngine(4) 319 d = self.multiengine.execute(True, 0, 'a=5') 320 d.addCallback(lambda _: self.assert_(True)) 321 return d 322 327 execute = self.multiengine.execute 328 d = execute('a=5', targets=0, block=True) 329 d.addCallback(lambda r: self.assert_(len(r)==1)) 330 d.addCallback(lambda _: execute('b=10')) 331 d.addCallback(lambda r: self.assert_(len(r)==4)) 332 d.addCallback(lambda _: execute('c=30', block=False)) 333 d.addCallback(lambda did: self.assert_(isdid(did))) 334 d.addCallback(lambda _: execute('d=[0,1,2]', block=False)) 335 d.addCallback(lambda did: self.multiengine.getPendingDeferred(did, True)) 336 d.addCallback(lambda r: self.assert_(len(r)==4)) 337 return d 338 339 def testPushPull(self): 340 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'}) 341 self.addEngine(4) 342 push = self.multiengine.push 343 pull = self.multiengine.pull 344 d = push({'data':data}, targets=0) 345 d.addCallback(lambda r: pull('data', targets=0)) 346 d.addCallback(lambda r: self.assertEqual(r,[data])) 347 d.addCallback(lambda _: push({'data':data})) 348 d.addCallback(lambda r: pull('data')) 349 d.addCallback(lambda r: self.assertEqual(r,4*[data])) 350 d.addCallback(lambda _: push({'data':data}, block=False)) 351 d.addCallback(lambda did: self.multiengine.getPendingDeferred(did, True)) 352 d.addCallback(lambda _: pull('data', block=False)) 353 d.addCallback(lambda did: self.multiengine.getPendingDeferred(did, True)) 354 d.addCallback(lambda r: self.assertEqual(r,4*[data])) 355 return d 356 357 def testPushPullFunction(self): 358 self.addEngine(4) 359 pushf = self.multiengine.pushFunction 360 pullf = self.multiengine.pullFunction 361 push = self.multiengine.push 362 pull = self.multiengine.pull 363 execute = self.multiengine.execute 364 d = pushf({'testf':testf}, targets=0) 365 d.addCallback(lambda r: pullf('testf', targets=0)) 366 d.addCallback(lambda r: self.assertEqual(r[0](1.0), testf(1.0))) 367 # d.addCallback(lambda _: execute('r = testf(10)', targets=0)) 368 # d.addCallback(lambda _: pull('r', targets=0)) 369 # d.addCallback(lambda r: self.assertEquals(r[0], testf(10))) 370 # d.addCallback(lambda _: pushf({'testf':testf}, block=False)) 371 # d.addCallback(lambda did: self.multiengine.getPendingDeferred(did, True)) 372 # d.addCallback(lambda _: pullf('testf', block=False)) 373 # d.addCallback(lambda did: self.multiengine.getPendingDeferred(did, True)) 374 # d.addCallback(lambda r: self.assertEqual(r[0](1.0), testf(1.0))) 375 # d.addCallback(lambda _: execute("def g(x): return x*x", targets=0)) 376 # d.addCallback(lambda _: pullf(('testf','g'),targets=0)) 377 # d.addCallback(lambda r: self.assertEquals((r[0][0](10),r[0][1](10)), (testf(10), 100))) 378 return d 379 380 def testGetResult(self): 381 shell = Interpreter() 382 result1 = shell.execute('a=10') 383 result1['id'] = 0 384 result2 = shell.execute('b=20') 385 result2['id'] = 0 386 execute= self.multiengine.execute 387 getResult = self.multiengine.getResult 388 self.addEngine(1) 389 d = execute('a=10') 390 d.addCallback(lambda _: getResult()) 391 d.addCallback(lambda r: self.assertEquals(r[0], result1)) 392 d.addCallback(lambda _: execute('b=20')) 393 d.addCallback(lambda _: getResult(1)) 394 d.addCallback(lambda r: self.assertEquals(r[0], result1)) 395 d.addCallback(lambda _: getResult(2, block=False)) 396 d.addCallback(lambda did: self.multiengine.getPendingDeferred(did, True)) 397 d.addCallback(lambda r: self.assertEquals(r[0], result2)) 398 return d 399 400 def testReset(self): 401 pass 402 403 def testKeys(self): 404 pass 405 406 def testPushSerialized(self): 407 pass 408 409 def testPullSerialized(self): 410 pass 411 412 def testClearQueue(self): 413 pass 414 415 def testQueueStatus(self): 416 pass 417 418 def testSetProperties(self): 419 pass 420 421 def testGetProperties(self): 422 pass 423 424 def testHasProperties(self): 425 pass 426 427 def testDelProperties(self): 428 pass 429 430 def testclearProperties(self): 431 pass 432 433 def testGetIDs(self): 434 pass 435 436 437 323 438 class ITwoPhaseMultiEngineTestCase(IMultiEngineBaseTestCase): 324 439 pass ipython1/branches/ipython1-client-r3021/ipython1/kernel/tests/test_multienginexmlrpc.py
r3023 r3036 11 11 from ipython1.kernel.controllerservice import ControllerService 12 12 from ipython1.kernel.multiengine import IMultiEngine 13 from ipython1.kernel.tests.multienginetest import IEngineMultiplexerTestCase 13 from ipython1.kernel.tests.multienginetest import (IMultiEngineTestCase, 14 ISynchronousMultiEngineTestCase) 14 15 15 16 from ipython1.kernel.multienginexmlrpc import IXMLRPCMultiEngineFactory 16 from ipython1.kernel.multienginexmlrpc import XMLRPCMultiEngineClient 17 from ipython1.kernel.multienginexmlrpc import XMLRPCSynchronousMultiEngineClient 18 from ipython1.kernel import multiengine as me 17 19 18 20 class BasicMultiEngineTestCase(DeferredTestCase, 19 I EngineMultiplexerTestCase):21 IMultiEngineTestCase): 20 22 21 23 def setUp(self): … … 29 31 self.imultiengine_factory = IXMLRPCMultiEngineFactory(self.imultiengine) 30 32 self.servers.append(reactor.listenTCP(10105, self.imultiengine_factory)) 31 self.multiengine = XMLRPCMultiEngineClient(('localhost',10105))33 self.multiengine = me.IFullTwoPhaseMultiEngine(XMLRPCSynchronousMultiEngineClient(('localhost',10105))) 32 34 self.engines = [] 33 35 … … 50 52 return dl 51 53 54 class SynchronousMultiEngineTestCase(DeferredTestCase, 55 ISynchronousMultiEngineTestCase): 56 57 def setUp(self): 58 self.servers = [] 59 self.clients = [] 60 self.services = [] 61 62 self.controller = ControllerService() 63 self.controller.startService() 64 self.imultiengine = IMultiEngine(self.controller) 65 self.imultiengine_factory = IXMLRPCMultiEngineFactory(self.imultiengine) 66 self.servers.append(reactor.listenTCP(10105, self.imultiengine_factory)) 67 self.multiengine = XMLRPCSynchronousMultiEngineClient(('localhost',10105)) 68 self.engines = [] 69 70 def tearDown(self): 71 l = [] 72 for s in self.servers: 73 try: 74 d = s.stopListening() 75 if d is not None: 76 l.append(d) 77 except: 78 pass 79 for c in self.clients: 80 c.disconnect() 81 del c 82 dl = defer.DeferredList(l) 83 self.controller.stopService() 84 for e in self.engines: 85 e.stopService() 86 return dl ipython1/branches/ipython1-client-r3021/ipython1/kernel/tests/test_pendingdeferred.py
r3031 r3036 50 50 51 51 def bar(self, bahz): 52 return defer.succeed('blahblah ' +bahz)52 return defer.succeed('blahblah: %s' % bahz) 53 53 54 54 class TwoPhaseFoo(pd.PendingDeferredManager): … … 61 61 def bar(self, bahz): 62 62 return self.foo.bar(bahz) 63 63 64 def bam(self, bahz, block): 65 def process_it(r, extra1, extra2='hi'): 66 return r+' '+extra1+' '+extra2 67 d = self.foo.bar(bahz) 68 if block: 69 d.addCallback(process_it, 'extra1', extra2='extra2') 70 return d 71 else: 72 deferredID = self.getNextDeferredID() 73 self.savePendingDeferred(deferredID, d, callback=process_it, arguments=(('extra1',), dict(extra2='extra2'))) 74 return defer.succeed(deferredID) 64 75 65 76 class PendingDeferredManagerTest(DeferredTestCase): … … 93 104 d = pdm.getPendingDeferred(did,False) 94 105 d.addErrback(lambda f: self.assertRaises(error.InvalidDeferredID, f.raiseException)) 95 96 106 107 def testCallback(self): 108 foo = Foo() 109 pdm = TwoPhaseFoo(foo) 110 d = pdm.bam('bam', block=True) 111 d.addCallback(lambda r: self.assertEquals(r, "blahblah: bam extra1 extra2")) 112 d.addCallback(lambda r: pdm.bam('bam', block=False)) 113 d.addCallback(lambda did: pdm.getPendingDeferred(did, True)) 114 d.addCallback(lambda r: self.assertEquals(r, "blahblah: bam extra1 extra2")) 115 return d 97 116 98 117 #------------------------------------------------------------------------------- ipython1/branches/ipython1-client-r3021/ipython1/kernel/tests/test_task.py
r2837 r3036 73 73 74 74 def testClears(self): 75 d = self.me.execute( 0,'b=1')75 d = self.me.execute('b=1', targets=0) 76 76 t = task.Task('a=1', clearBefore=True, resultNames='b', clearAfter=True) 77 77 d.addCallback(lambda _:self.tc.run(t)) … … 79 79 d.addCallback(lambda tr: tr.failure) 80 80 d = self.assertDeferredRaises(d, NameError) # check b for clearBefore 81 d.addCallback(lambda _:self.me.pull( 0,'a'))81 d.addCallback(lambda _:self.me.pull('a', targets=0)) 82 82 d = self.assertDeferredRaises(d, NameError) # check a for clearAfter 83 83 return d 84 84 85 85 def testSimpleRetries(self): 86 d = self.me.execute( 0, 'i=0')86 d = self.me.execute('i=0', targets=0) 87 87 t = task.Task("i += 1\nassert i == 16", resultNames='i',retries=10) 88 88 t2 = task.Task("i += 1\nassert i == 16", resultNames='i',retries=10) … … 121 121 122 122 def testSetupNS(self): 123 d = self.me.execute( 0, 'a=0')123 d = self.me.execute('a=0', targets=0) 124 124 ns = dict(a=1, b=0) 125 125 t = task.Task("", setupNS=ns, resultNames=['a','b']) ipython1/branches/ipython1-client-r3021/ipython1/kernel/tests/tgenerator.py
r3033 r3036 166 166 167 167 def computeOutput(self, i): 168 return self.testCaseInstance.multiengine.execute( self.targets, i)168 return self.testCaseInstance.multiengine.execute(i, targets=self.targets) 169 169 170 170 class MultiEngineFailingExecuteTestGenerator(TestGenerator): … … 179 179 180 180 def computeOutput(self, i): 181 return self.testCaseInstance.multiengine.execute( self.targets, i)181 return self.testCaseInstance.multiengine.execute(i, targets=self.targets) 182 182 183 183 class MultiEngineGetResultTestGenerator(TestGenerator): … … 196 196 197 197 def computeOutput(self, i): 198 d = self.testCaseInstance.multiengine.execute( self.targets, i)198 d = self.testCaseInstance.multiengine.execute(i, targets=self.targets) 199 199 # d.addCallback(lambda r: self.testCaseInstance.multiengine.getResult(self.targets, r[0]['number'])) 200 200 return d
