Changeset 3072
- Timestamp:
- 02/23/08 16:55:14 (9 months ago)
- Files:
-
- ipython1/branches/ipython1-client-r3021/ipython1/kernel/error.py (modified) (1 diff)
- ipython1/branches/ipython1-client-r3021/ipython1/kernel/multienginexmlrpc.py (modified) (4 diffs)
- ipython1/branches/ipython1-client-r3021/ipython1/kernel/pendingdeferred.py (modified) (5 diffs)
- ipython1/branches/ipython1-client-r3021/ipython1/kernel/tests/test_pendingdeferred.py (modified) (2 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
ipython1/branches/ipython1-client-r3021/ipython1/kernel/error.py
r3057 r3072 88 88 89 89 class UnpickleableException(KernelError): 90 pass 91 92 class AbortedPendingDeferredError(KernelError): 90 93 pass 91 94 ipython1/branches/ipython1-client-r3021/ipython1/kernel/multienginexmlrpc.py
r3071 r3072 559 559 else: 560 560 # Here we are going to use a _local_ PendingDeferredManager. 561 deferred_id = self.pdm.get_ next_deferred_id()561 deferred_id = self.pdm.get_deferred_id() 562 562 # This is the deferred we will return to the user that will fire 563 563 # with the local deferred_id AFTER we have received the list of … … 572 572 d.addCallback(do_it) 573 573 # Now save the deferred to the final result 574 self.pdm.save_pending_deferred(d eferred_id,d)574 self.pdm.save_pending_deferred(d, deferred_id) 575 575 return d_to_return 576 576 … … 618 618 else: 619 619 # Here we are going to use a _local_ PendingDeferredManager. 620 deferred_id = self.pdm.get_ next_deferred_id()620 deferred_id = self.pdm.get_deferred_id() 621 621 # This is the deferred we will return to the user that will fire 622 622 # with the local deferred_id AFTER we have received the list of … … 631 631 d.addCallback(do_it) 632 632 # Now save the deferred to the final result 633 self.pdm.save_pending_deferred(d eferred_id,d)633 self.pdm.save_pending_deferred(d, deferred_id) 634 634 return d_to_return 635 635 ipython1/branches/ipython1-client-r3021/ipython1/kernel/pendingdeferred.py
r3071 r3072 45 45 """Manage pending deferreds.""" 46 46 47 self.pendingDeferreds = {} 47 self.results = {} # Populated when results are ready 48 self.deferred_ids = [] # List of deferred ids I am managing 49 self.deferreds_to_callback = {} # dict of lists of deferreds to callback 48 50 49 def get_next_deferred_id(self): 50 """Get the next available deferred id. 51 def get_deferred_id(self): 52 return guid.generate() 53 54 def quick_has_id(self, deferred_id): 55 return deferred_id in self.deferred_ids 56 57 def _save_result(self, result, deferred_id): 58 if self.quick_has_id(deferred_id): 59 self.results[deferred_id] = result 60 self._trigger_callbacks(deferred_id) 61 62 def _trigger_callbacks(self, deferred_id): 63 # Go through and call the waiting callbacks 64 result = self.results.get(deferred_id) 65 if result is not None: # Only trigger if there is a result 66 try: 67 d = self.deferreds_to_callback.pop(deferred_id) 68 except KeyError: 69 d = None 70 if d is not None: 71 if isinstance(result, failure.Failure): 72 d.errback(result) 73 else: 74 d.callback(result) 75 self.delete_pending_deferred(deferred_id) 76 77 def save_pending_deferred(self, d, deferred_id=None): 78 """Save the result of a deferred for later retrieval. 51 79 52 :Returns:53 deferredID : str54 The deferred id that the client should use for the next55 deferred is will save to me.80 This works even if the deferred has not fired. 81 82 Only callbacks and errbacks applied to d before this method 83 is called will be called no the final result. 56 84 """ 57 58 return guid.generate() 59 60 def quick_has_id(self, deferred_id): 61 return self.pendingDeferreds.has_key(deferred_id) 62 63 def save_pending_deferred(self, deferredID, d, callback=None, args=None, kwargs=None): 64 """Save a deferred to me by deferredID. 65 66 :Parameters: 67 deferredID : str 68 A deferred id the client got by calling `get_next_deferred_id`. 69 d : Deferred 70 The deferred to save. 71 callback : FunctionType 72 A function to add to the callback of d before returning. 73 arguments: tuple 74 A two tuple of (args, kwargs) to pass to the function 75 """ 76 pd = self.pendingDeferreds.get(deferredID) 77 if pd is not None: 78 self.remove_pending_deferred(deferredID) 79 self.pendingDeferreds[deferredID] = (d, callback, args, kwargs) 80 81 def remove_pending_deferred(self, deferredID): 85 if deferred_id is None: 86 deferred_id = self.get_deferred_id() 87 self.deferred_ids.append(deferred_id) 88 d.addBoth(self._save_result, deferred_id) 89 return deferred_id 90 91 def _protected_del(self, key, container): 92 try: 93 del container[key] 94 except Exception: 95 pass 96 97 def delete_pending_deferred(self, deferred_id): 82 98 """Remove a deferred I am tracking and add a null Errback. 83 99 … … 86 102 The id of a deferred that I am tracking. 87 103 """ 88 pd_tuple = self.pendingDeferreds.get(deferredID) 89 if pd_tuple is not None: 90 pd = pd_tuple[0] 91 # Consume any remaining errors coming down the line. 92 pd.addErrback(lambda f: None) 93 del self.pendingDeferreds[deferredID] 104 if self.quick_has_id(deferred_id): 105 # First go through a errback any deferreds that are still waiting 106 d = self.deferreds_to_callback.get(deferred_id) 107 if d is not None: 108 d.errback(failure.Failure(error.AbortedPendingDeferredError("pending deferred has been deleted: %r"%deferred_id))) 109 # Now delete all references to this deferred_id 110 ind = self.deferred_ids.index(deferred_id) 111 self._protected_del(ind, self.deferred_ids) 112 self._protected_del(deferred_id, self.deferreds_to_callback) 113 self._protected_del(deferred_id, self.results) 114 else: 115 raise error.InvalidDeferredID('invalid deferred_id: %r' % deferred_id) 94 116 95 117 def clean_out_deferreds(self): 96 118 """Remove all the deferreds I am tracking.""" 97 for k in self.pendingDeferreds.keys():98 self. remove_pending_deferred(k)119 for did in self.deferred_ids: 120 self.delete_pending_deferred(did) 99 121 100 def _delete AndPassThrough(self, r, deferredID):101 self. remove_pending_deferred(deferredID)122 def _delete_and_pass_through(self, r, deferred_id): 123 self.delete_pending_deferred(deferred_id) 102 124 return r 103 125 104 def get_pending_deferred(self, deferredID, block): 105 """Get a pending deferred that I am tracking by deferredID. 106 107 :Parameters: 108 deferredID : int 109 The id of a deferred I am tracking 110 block : boolean 111 Should I block until the deferred has fired. 112 """ 113 (pd, cbfunc, cbfunc_args, cbfunc_kwargs) = self.pendingDeferreds.get(deferredID,(None,None,None,None)) 114 if pd is not None: 115 if pd.called: # called 116 if isinstance(pd.result, failure.Failure): 117 d_to_return = defer.fail(pd.result) 118 elif isinstance(pd.result, defer.Deferred): 119 # This section is required because sometimes the original deferred (pd) 120 # has fired, but its result is another deferred, which has not yet fired. 121 if not block and not pd.result.called: 122 return defer.fail(failure.Failure(error.ResultNotCompleted("result not completed: %r" % deferredID))) 123 else: 124 # I am still not sure why I need to chain things in this way. 125 # But I know it works. 126 d_to_return = defer.Deferred() 127 pd.chainDeferred(d_to_return) 128 else: 129 d_to_return = defer.succeed(pd.result) 130 else: # not called 131 if block: 132 pd.addCallback(self._deleteAndPassThrough, deferredID) 133 d_to_return = defer.Deferred() 134 pd.chainDeferred(d_to_return) 135 else: # not block 136 return defer.fail(failure.Failure(error.ResultNotCompleted("result not completed: %r" % deferredID))) 137 138 # Register the callback function with its args/kwargs if needed 139 if cbfunc is not None: 140 if cbfunc_args is None: 141 cbfunc_args = () 142 if cbfunc_kwargs is None: 143 cbfunc_kwargs = {} 144 d_to_return.addCallback(cbfunc, *cbfunc_args, **cbfunc_kwargs) 145 146 return d_to_return 147 else: 148 return defer.fail(failure.Failure(error.InvalidDeferredID('Invalid deferredID: ' + repr(deferredID)))) 149 150 def getAllPendingDeferreds(self): 151 dList = [] 152 keys = self.pdManager.pendingDeferreds.keys() 153 for k in keys: 154 dList.append(self.pdManager.get_pending_deferred(k, block=True)) 155 if len(dList) > 0: 156 return gatherBoth(dList, consumeErrors=1) 157 else: 158 return defer.succeed([None]) 126 def get_pending_deferred(self, deferred_id, block): 127 if not self.quick_has_id(deferred_id) or self.deferreds_to_callback.get(deferred_id) is not None: 128 return defer.fail(failure.Failure(error.InvalidDeferredID('invalid deferred_id: %r' + deferred_id))) 129 result = self.results.get(deferred_id) 130 if result is not None: 131 self.delete_pending_deferred(deferred_id) 132 if isinstance(result, failure.Failure): 133 return defer.fail(result) 134 else: 135 return defer.succeed(result) 136 else: # Result is not ready 137 if block: 138 d = defer.Deferred() 139 self.deferreds_to_callback[deferred_id] = d 140 return d 141 else: 142 return defer.fail(failure.Failure(error.ResultNotCompleted("result not completed: %r" % deferred_id))) 159 143 160 161 def two_phase(wrappedMethod): 144 def two_phase(wrapped_method): 162 145 """Wrap methods that return a deferred into a two phase process. 163 146 164 147 This transforms:: 165 148 166 foo(arg1, arg2, ...) -> foo( block, arg1, arg2, ...).149 foo(arg1, arg2, ...) -> foo(arg1, arg2,...,block=True). 167 150 168 The wrapped method will then return a deferred to a deferred ID. This will151 The wrapped method will then return a deferred to a deferred id. This will 169 152 only work on method of classes that inherit from `PendingDeferredManager`, 170 153 as that class provides an API for … … 175 158 """ 176 159 177 def wrapper TwoPhase(pendingDeferredManager, *args, **kwargs):160 def wrapper_two_phase(pdm, *args, **kwargs): 178 161 try: 179 162 block = kwargs.pop('block') … … 181 164 block = True # The default if not specified 182 165 if block: 183 return wrapped Method(pendingDeferredManager, *args, **kwargs)166 return wrapped_method(pdm, *args, **kwargs) 184 167 else: 185 deferredID = pendingDeferredManager.get_next_deferred_id() 186 d = wrappedMethod(pendingDeferredManager, *args, **kwargs) 187 pendingDeferredManager.save_pending_deferred(deferredID, d) 188 return defer.succeed(deferredID) 168 d = wrapped_method(pdm, *args, **kwargs) 169 deferred_id=pdm.save_pending_deferred(d) 170 return defer.succeed(deferred_id) 189 171 190 return wrapper TwoPhase172 return wrapper_two_phase 191 173 192 174 … … 194 176 195 177 196 ipython1/branches/ipython1-client-r3021/ipython1/kernel/tests/test_pendingdeferred.py
r3070 r3072 18 18 #------------------------------------------------------------------------------- 19 19 20 from twisted.internet import defer 21 from twisted.python import failure 22 20 23 from ipython1.testutils import tcommon 21 24 from ipython1.testutils.tcommon import * 22 23 25 from ipython1.testutils.util import DeferredTestCase 24 25 from twisted.internet import defer26 26 import ipython1.kernel.pendingdeferred as pd 27 27 from ipython1.kernel import error 28 from ipython1.kernel.util import printer 28 29 29 30 … … 61 62 def bar(self, bahz): 62 63 return self.foo.bar(bahz) 63 64 def bam(self, bahz, block):65 def process_it(r, extra1, extra2='hi'):66 return r+' '+extra1+' '+extra267 d = self.foo.bar(bahz)68 if block:69 d.addCallback(process_it, 'extra1', extra2='extra2')70 return d71 else:72 deferredID = self.get_next_deferred_id()73 self.save_pending_deferred(deferredID, d, callback=process_it,74 args=('extra1',),kwargs=dict(extra2='extra2'))75 return defer.succeed(deferredID)76 64 77 65 class PendingDeferredManagerTest(DeferredTestCase): 78 66 79 67 def setUp(self): 80 pass68 self.pdm = pd.PendingDeferredManager() 81 69 82 70 def tearDown(self): 83 71 pass 84 72 85 73 def testBasic(self): 86 pdm = pd.PendingDeferredManager()87 74 dDict = {} 75 # Create 10 deferreds and save them 88 76 for i in range(10): 89 77 d = defer.Deferred() 90 did = pdm.get_next_deferred_id() 91 pdm.save_pending_deferred(did, d) 78 did = self.pdm.save_pending_deferred(d) 92 79 dDict[did] = d 80 # Make sure they are begin saved 81 for k in dDict.keys(): 82 self.assert_(self.pdm.quick_has_id(k)) 83 # Get the pending deferred (block=True), then callback with 'foo' and compare 93 84 for did in dDict.keys()[0:5]: 94 d = pdm.get_pending_deferred(did,block=True)85 d = self.pdm.get_pending_deferred(did,block=True) 95 86 dDict[did].callback('foo') 96 87 d.addCallback(lambda r: self.assert_(r=='foo')) 88 # Get the pending deferreds with (block=False) and make sure ResultNotCompleted is raised 97 89 for did in dDict.keys()[5:10]: 98 d = pdm.get_pending_deferred(did,block=False)90 d = self.pdm.get_pending_deferred(did,block=False) 99 91 d.addErrback(lambda f: self.assertRaises(error.ResultNotCompleted, f.raiseException)) 92 # Now callback the last 5, get them and compare. 100 93 for did in dDict.keys()[5:10]: 101 94 dDict[did].callback('foo') 102 d = pdm.get_pending_deferred(did,block=False)95 d = self.pdm.get_pending_deferred(did,block=False) 103 96 d.addCallback(lambda r: self.assert_(r=='foo')) 104 for did in dDict.keys(): 105 d = pdm.get_pending_deferred(did,False) 106 d.addErrback(lambda f: self.assertRaises(error.InvalidDeferredID, f.raiseException)) 107 108 def testCallback(self): 109 foo = Foo() 110 pdm = TwoPhaseFoo(foo) 111 d = pdm.bam('bam', block=True) 112 d.addCallback(lambda r: self.assertEquals(r, "blahblah: bam extra1 extra2")) 113 d.addCallback(lambda r: pdm.bam('bam', block=False)) 114 d.addCallback(lambda did: pdm.get_pending_deferred(did, True)) 115 d.addCallback(lambda r: self.assertEquals(r, "blahblah: bam extra1 extra2")) 116 return d 97 98 def test_save_then_delete(self): 99 d = defer.Deferred() 100 did = self.pdm.save_pending_deferred(d) 101 self.assert_(self.pdm.quick_has_id(did)) 102 self.pdm.delete_pending_deferred(did) 103 self.assert_(not self.pdm.quick_has_id(did)) 104 105 def test_save_get_delete(self): 106 d = defer.Deferred() 107 did = self.pdm.save_pending_deferred(d) 108 d2 = self.pdm.get_pending_deferred(did,True) 109 d2.addErrback(lambda f: self.assertRaises(error.AbortedPendingDeferredError, f.raiseException)) 110 self.pdm.delete_pending_deferred(did) 111 return d2 112 113 def test_double_get(self): 114 d = defer.Deferred() 115 did = self.pdm.save_pending_deferred(d) 116 d2 = self.pdm.get_pending_deferred(did,True) 117 d3 = self.pdm.get_pending_deferred(did,True) 118 d3.addErrback(lambda f: self.assertRaises(error.InvalidDeferredID, f.raiseException)) 119 120 def test_get_after_callback(self): 121 d = defer.Deferred() 122 did = self.pdm.save_pending_deferred(d) 123 d.callback('foo') 124 d2 = self.pdm.get_pending_deferred(did,True) 125 d2.addCallback(lambda r: self.assertEquals(r,'foo')) 126 self.assert_(not self.pdm.quick_has_id(did)) 127 128 def test_get_before_callback(self): 129 d = defer.Deferred() 130 did = self.pdm.save_pending_deferred(d) 131 d2 = self.pdm.get_pending_deferred(did,True) 132 d.callback('foo') 133 d2.addCallback(lambda r: self.assertEquals(r,'foo')) 134 self.assert_(not self.pdm.quick_has_id(did)) 135 d = defer.Deferred() 136 did = self.pdm.save_pending_deferred(d) 137 d2 = self.pdm.get_pending_deferred(did,True) 138 d2.addCallback(lambda r: self.assertEquals(r,'foo')) 139 d.callback('foo') 140 self.assert_(not self.pdm.quick_has_id(did)) 141 142 def test_get_after_errback(self): 143 class MyError(Exception): 144 pass 145 d = defer.Deferred() 146 did = self.pdm.save_pending_deferred(d) 147 d.errback(failure.Failure(MyError('foo'))) 148 d2 = self.pdm.get_pending_deferred(did,True) 149 d2.addErrback(lambda f: self.assertRaises(MyError, f.raiseException)) 150 self.assert_(not self.pdm.quick_has_id(did)) 151 152 def test_get_before_errback(self): 153 class MyError(Exception): 154 pass 155 d = defer.Deferred() 156 did = self.pdm.save_pending_deferred(d) 157 d2 = self.pdm.get_pending_deferred(did,True) 158 d.errback(failure.Failure(MyError('foo'))) 159 d2.addErrback(lambda f: self.assertRaises(MyError, f.raiseException)) 160 self.assert_(not self.pdm.quick_has_id(did)) 161 d = defer.Deferred() 162 did = self.pdm.save_pending_deferred(d) 163 d2 = self.pdm.get_pending_deferred(did,True) 164 d2.addErrback(lambda f: self.assertRaises(MyError, f.raiseException)) 165 d.errback(failure.Failure(MyError('foo'))) 166 self.assert_(not self.pdm.quick_has_id(did)) 117 167 168 def test_noresult_noblock(self): 169 d = defer.Deferred() 170 did = self.pdm.save_pending_deferred(d) 171 d2 = self.pdm.get_pending_deferred(did,False) 172 d2.addErrback(lambda f: self.assertRaises(error.ResultNotCompleted, f.raiseException)) 173 174 def test_with_callbacks(self): 175 d = defer.Deferred() 176 d.addCallback(lambda r: r+' foo') 177 d.addCallback(lambda r: r+' bar') 178 did = self.pdm.save_pending_deferred(d) 179 d2 = self.pdm.get_pending_deferred(did,True) 180 d.callback('bam') 181 d2.addCallback(lambda r: self.assertEquals(r,'bam foo bar')) 182 183 def test_with_errbacks(self): 184 class MyError(Exception): 185 pass 186 d = defer.Deferred() 187 d.addCallback(lambda r: 'foo') 188 d.addErrback(lambda f: 'caught error') 189 did = self.pdm.save_pending_deferred(d) 190 d2 = self.pdm.get_pending_deferred(did,True) 191 d.errback(failure.Failure(MyError('bam'))) 192 d2.addErrback(lambda f: self.assertRaises(MyError, f.raiseException)) 193 194 def test_nested_deferreds(self): 195 d = defer.Deferred() 196 d2 = defer.Deferred() 197 d.addCallback(lambda r: d2) 198 did = self.pdm.save_pending_deferred(d) 199 d.callback('foo') 200 d3 = self.pdm.get_pending_deferred(did,False) 201 d3.addErrback(lambda f: self.assertRaises(error.ResultNotCompleted, f.raiseException)) 202 d2.callback('bar') 203 d3 = self.pdm.get_pending_deferred(did,False) 204 d3.addCallback(lambda r: self.assertEquals(r,'bar')) 205 118 206 #------------------------------------------------------------------------------- 119 207 # Regular Unittests
