Changeset 3072

Show
Ignore:
Timestamp:
02/23/08 16:55:14 (9 months ago)
Author:
bgranger
Message:

Completely refactored the pendingdeferred module. Now it no longer holds onto the actual original deferreds, but instead tracks their results. This means I don't have to hack on the internal details of the deferred objects, which was proving to be a very buggy thing. In particular, I was having lots of problems with the called/result attributes of the deferred behaiving in very subtle ways.

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • ipython1/branches/ipython1-client-r3021/ipython1/kernel/error.py

    r3057 r3072  
    8888 
    8989class UnpickleableException(KernelError): 
     90    pass 
     91 
     92class AbortedPendingDeferredError(KernelError): 
    9093    pass 
    9194 
  • ipython1/branches/ipython1-client-r3021/ipython1/kernel/multienginexmlrpc.py

    r3071 r3072  
    559559            else: 
    560560                # Here we are going to use a _local_ PendingDeferredManager. 
    561                 deferred_id = self.pdm.get_next_deferred_id() 
     561                deferred_id = self.pdm.get_deferred_id() 
    562562                # This is the deferred we will return to the user that will fire 
    563563                # with the local deferred_id AFTER we have received the list of  
     
    572572                d.addCallback(do_it) 
    573573                # Now save the deferred to the final result 
    574                 self.pdm.save_pending_deferred(deferred_id, d) 
     574                self.pdm.save_pending_deferred(d, deferred_id) 
    575575                return d_to_return 
    576576 
     
    618618            else: 
    619619                # Here we are going to use a _local_ PendingDeferredManager. 
    620                 deferred_id = self.pdm.get_next_deferred_id() 
     620                deferred_id = self.pdm.get_deferred_id() 
    621621                # This is the deferred we will return to the user that will fire 
    622622                # with the local deferred_id AFTER we have received the list of  
     
    631631                d.addCallback(do_it) 
    632632                # Now save the deferred to the final result 
    633                 self.pdm.save_pending_deferred(deferred_id, d) 
     633                self.pdm.save_pending_deferred(d, deferred_id) 
    634634                return d_to_return 
    635635 
  • ipython1/branches/ipython1-client-r3021/ipython1/kernel/pendingdeferred.py

    r3071 r3072  
    4545        """Manage pending deferreds.""" 
    4646 
    47         self.pendingDeferreds = {} 
     47        self.results = {} # Populated when results are ready 
     48        self.deferred_ids = [] # List of deferred ids I am managing 
     49        self.deferreds_to_callback = {} # dict of lists of deferreds to callback 
    4850         
    49     def get_next_deferred_id(self): 
    50         """Get the next available deferred id. 
     51    def get_deferred_id(self): 
     52        return guid.generate() 
     53     
     54    def quick_has_id(self, deferred_id): 
     55        return deferred_id in self.deferred_ids 
     56     
     57    def _save_result(self, result, deferred_id): 
     58        if self.quick_has_id(deferred_id): 
     59            self.results[deferred_id] = result 
     60            self._trigger_callbacks(deferred_id) 
     61     
     62    def _trigger_callbacks(self, deferred_id): 
     63        # Go through and call the waiting callbacks 
     64        result = self.results.get(deferred_id) 
     65        if result is not None:  # Only trigger if there is a result 
     66            try: 
     67                d = self.deferreds_to_callback.pop(deferred_id) 
     68            except KeyError: 
     69                d = None 
     70            if d is not None: 
     71                if isinstance(result, failure.Failure): 
     72                    d.errback(result) 
     73                else: 
     74                    d.callback(result) 
     75                self.delete_pending_deferred(deferred_id) 
     76                    
     77    def save_pending_deferred(self, d, deferred_id=None): 
     78        """Save the result of a deferred for later retrieval. 
    5179         
    52         :Returns: 
    53             deferredID : str 
    54                 The deferred id that the client should use for the next 
    55                 deferred is will save to me
     80        This works even if the deferred has not fired. 
     81         
     82        Only callbacks and errbacks applied to d before this method 
     83        is called will be called no the final result
    5684        """ 
    57  
    58         return guid.generate() 
    59          
    60     def quick_has_id(self, deferred_id): 
    61         return self.pendingDeferreds.has_key(deferred_id) 
    62          
    63     def save_pending_deferred(self, deferredID, d, callback=None, args=None, kwargs=None): 
    64         """Save a deferred to me by deferredID. 
    65          
    66         :Parameters: 
    67             deferredID : str 
    68                 A deferred id the client got by calling `get_next_deferred_id`. 
    69             d : Deferred 
    70                 The deferred to save. 
    71             callback : FunctionType 
    72                 A function to add to the callback of d before returning. 
    73             arguments: tuple 
    74                 A two tuple of (args, kwargs) to pass to the function 
    75         """ 
    76         pd = self.pendingDeferreds.get(deferredID) 
    77         if pd is not None: 
    78             self.remove_pending_deferred(deferredID) 
    79         self.pendingDeferreds[deferredID] = (d, callback, args, kwargs) 
    80          
    81     def remove_pending_deferred(self, deferredID): 
     85        if deferred_id is None: 
     86            deferred_id = self.get_deferred_id() 
     87        self.deferred_ids.append(deferred_id) 
     88        d.addBoth(self._save_result, deferred_id) 
     89        return deferred_id 
     90     
     91    def _protected_del(self, key, container): 
     92        try: 
     93            del container[key] 
     94        except Exception: 
     95            pass 
     96     
     97    def delete_pending_deferred(self, deferred_id): 
    8298        """Remove a deferred I am tracking and add a null Errback. 
    8399         
     
    86102                The id of a deferred that I am tracking. 
    87103        """ 
    88         pd_tuple = self.pendingDeferreds.get(deferredID) 
    89         if pd_tuple is not None: 
    90             pd = pd_tuple[0] 
    91             # Consume any remaining errors coming down the line. 
    92             pd.addErrback(lambda f: None) 
    93             del self.pendingDeferreds[deferredID] 
     104        if self.quick_has_id(deferred_id): 
     105            # First go through a errback any deferreds that are still waiting 
     106            d = self.deferreds_to_callback.get(deferred_id) 
     107            if d is not None: 
     108                d.errback(failure.Failure(error.AbortedPendingDeferredError("pending deferred has been deleted: %r"%deferred_id))) 
     109            # Now delete all references to this deferred_id 
     110            ind = self.deferred_ids.index(deferred_id) 
     111            self._protected_del(ind, self.deferred_ids) 
     112            self._protected_del(deferred_id, self.deferreds_to_callback) 
     113            self._protected_del(deferred_id, self.results)             
     114        else: 
     115            raise error.InvalidDeferredID('invalid deferred_id: %r' % deferred_id) 
    94116     
    95117    def clean_out_deferreds(self): 
    96118        """Remove all the deferreds I am tracking.""" 
    97         for k in self.pendingDeferreds.keys()
    98             self.remove_pending_deferred(k
     119        for did in self.deferred_ids
     120            self.delete_pending_deferred(did
    99121         
    100     def _deleteAndPassThrough(self, r, deferredID): 
    101         self.remove_pending_deferred(deferredID
     122    def _delete_and_pass_through(self, r, deferred_id): 
     123        self.delete_pending_deferred(deferred_id
    102124        return r 
    103125         
    104     def get_pending_deferred(self, deferredID, block): 
    105         """Get a pending deferred that I am tracking by deferredID. 
    106          
    107         :Parameters: 
    108             deferredID : int 
    109                 The id of a deferred I am tracking 
    110             block : boolean 
    111                 Should I block until the deferred has fired. 
    112         """ 
    113         (pd, cbfunc, cbfunc_args, cbfunc_kwargs) = self.pendingDeferreds.get(deferredID,(None,None,None,None)) 
    114         if pd is not None: 
    115             if pd.called: # called 
    116                 if isinstance(pd.result, failure.Failure): 
    117                     d_to_return = defer.fail(pd.result) 
    118                 elif isinstance(pd.result, defer.Deferred): 
    119                     # This section is required because sometimes the original deferred (pd) 
    120                     # has fired, but its result is another deferred, which has not yet fired. 
    121                     if not block and not pd.result.called: 
    122                         return defer.fail(failure.Failure(error.ResultNotCompleted("result not completed: %r" % deferredID))) 
    123                     else: 
    124                         # I am still not sure why I need to chain things in this way. 
    125                         # But I know it works. 
    126                         d_to_return = defer.Deferred() 
    127                         pd.chainDeferred(d_to_return) 
    128                 else: 
    129                     d_to_return = defer.succeed(pd.result) 
    130             else: # not called 
    131                 if block: 
    132                     pd.addCallback(self._deleteAndPassThrough, deferredID) 
    133                     d_to_return = defer.Deferred() 
    134                     pd.chainDeferred(d_to_return) 
    135                 else: # not block 
    136                     return defer.fail(failure.Failure(error.ResultNotCompleted("result not completed: %r" % deferredID))) 
    137              
    138             # Register the callback function with its args/kwargs if needed 
    139             if cbfunc is not None: 
    140                 if cbfunc_args is None: 
    141                     cbfunc_args = () 
    142                 if cbfunc_kwargs is None: 
    143                     cbfunc_kwargs = {} 
    144                 d_to_return.addCallback(cbfunc, *cbfunc_args, **cbfunc_kwargs) 
    145                  
    146             return d_to_return 
    147         else: 
    148             return defer.fail(failure.Failure(error.InvalidDeferredID('Invalid deferredID: ' + repr(deferredID)))) 
    149              
    150     def getAllPendingDeferreds(self): 
    151         dList = [] 
    152         keys = self.pdManager.pendingDeferreds.keys() 
    153         for k in keys: 
    154             dList.append(self.pdManager.get_pending_deferred(k, block=True)) 
    155         if len(dList) > 0:   
    156             return gatherBoth(dList, consumeErrors=1) 
    157         else: 
    158             return defer.succeed([None]) 
     126    def get_pending_deferred(self, deferred_id, block): 
     127        if not self.quick_has_id(deferred_id) or self.deferreds_to_callback.get(deferred_id) is not None: 
     128            return defer.fail(failure.Failure(error.InvalidDeferredID('invalid deferred_id: %r' + deferred_id))) 
     129        result = self.results.get(deferred_id) 
     130        if result is not None: 
     131            self.delete_pending_deferred(deferred_id) 
     132            if isinstance(result, failure.Failure): 
     133                return defer.fail(result) 
     134            else: 
     135                return defer.succeed(result) 
     136        else:  # Result is not ready 
     137            if block: 
     138                d = defer.Deferred() 
     139                self.deferreds_to_callback[deferred_id] = d 
     140                return d 
     141            else: 
     142                return defer.fail(failure.Failure(error.ResultNotCompleted("result not completed: %r" % deferred_id))) 
    159143 
    160  
    161 def two_phase(wrappedMethod): 
     144def two_phase(wrapped_method): 
    162145    """Wrap methods that return a deferred into a two phase process. 
    163146     
    164147    This transforms:: 
    165148     
    166         foo(arg1, arg2, ...) -> foo(block, arg1, arg2, ...). 
     149        foo(arg1, arg2, ...) -> foo(arg1, arg2,...,block=True). 
    167150     
    168     The wrapped method will then return a deferred to a deferredID.  This will 
     151    The wrapped method will then return a deferred to a deferred id.  This will 
    169152    only work on method of classes that inherit from `PendingDeferredManager`, 
    170153    as that class provides an API for  
     
    175158    """ 
    176159     
    177     def wrapperTwoPhase(pendingDeferredManager, *args, **kwargs): 
     160    def wrapper_two_phase(pdm, *args, **kwargs): 
    178161        try: 
    179162            block = kwargs.pop('block') 
     
    181164            block = True  # The default if not specified 
    182165        if block: 
    183             return wrappedMethod(pendingDeferredManager, *args, **kwargs) 
     166            return wrapped_method(pdm, *args, **kwargs) 
    184167        else: 
    185             deferredID = pendingDeferredManager.get_next_deferred_id() 
    186             d = wrappedMethod(pendingDeferredManager, *args, **kwargs) 
    187             pendingDeferredManager.save_pending_deferred(deferredID, d) 
    188             return defer.succeed(deferredID) 
     168            d = wrapped_method(pdm, *args, **kwargs) 
     169            deferred_id=pdm.save_pending_deferred(d) 
     170            return defer.succeed(deferred_id) 
    189171     
    190     return wrapperTwoPhase 
     172    return wrapper_two_phase 
    191173                 
    192174                 
     
    194176             
    195177                 
    196          
  • ipython1/branches/ipython1-client-r3021/ipython1/kernel/tests/test_pendingdeferred.py

    r3070 r3072  
    1818#------------------------------------------------------------------------------- 
    1919 
     20from twisted.internet import defer 
     21from twisted.python import failure 
     22 
    2023from ipython1.testutils import tcommon 
    2124from ipython1.testutils.tcommon import * 
    22  
    2325from ipython1.testutils.util import DeferredTestCase 
    24  
    25 from twisted.internet import defer 
    2626import ipython1.kernel.pendingdeferred as pd 
    2727from ipython1.kernel import error 
     28from ipython1.kernel.util import printer 
    2829 
    2930 
     
    6162    def bar(self, bahz): 
    6263        return self.foo.bar(bahz) 
    63      
    64     def bam(self, bahz, block): 
    65         def process_it(r, extra1, extra2='hi'): 
    66             return r+' '+extra1+' '+extra2 
    67         d = self.foo.bar(bahz) 
    68         if block: 
    69             d.addCallback(process_it, 'extra1', extra2='extra2') 
    70             return d 
    71         else: 
    72             deferredID = self.get_next_deferred_id() 
    73             self.save_pending_deferred(deferredID, d, callback=process_it,  
    74                 args=('extra1',),kwargs=dict(extra2='extra2')) 
    75             return defer.succeed(deferredID) 
    7664 
    7765class PendingDeferredManagerTest(DeferredTestCase): 
    7866     
    7967    def setUp(self): 
    80         pass 
     68        self.pdm = pd.PendingDeferredManager() 
    8169         
    8270    def tearDown(self): 
    8371        pass 
    84          
     72     
    8573    def testBasic(self): 
    86         pdm = pd.PendingDeferredManager() 
    8774        dDict = {} 
     75        # Create 10 deferreds and save them 
    8876        for i in range(10): 
    8977            d = defer.Deferred() 
    90             did = pdm.get_next_deferred_id() 
    91             pdm.save_pending_deferred(did, d) 
     78            did = self.pdm.save_pending_deferred(d) 
    9279            dDict[did] = d 
     80        # Make sure they are begin saved 
     81        for k in dDict.keys(): 
     82            self.assert_(self.pdm.quick_has_id(k)) 
     83        # Get the pending deferred (block=True), then callback with 'foo' and compare 
    9384        for did in dDict.keys()[0:5]: 
    94             d = pdm.get_pending_deferred(did,block=True) 
     85            d = self.pdm.get_pending_deferred(did,block=True) 
    9586            dDict[did].callback('foo') 
    9687            d.addCallback(lambda r: self.assert_(r=='foo')) 
     88        # Get the pending deferreds with (block=False) and make sure ResultNotCompleted is raised 
    9789        for did in dDict.keys()[5:10]: 
    98             d = pdm.get_pending_deferred(did,block=False) 
     90            d = self.pdm.get_pending_deferred(did,block=False) 
    9991            d.addErrback(lambda f: self.assertRaises(error.ResultNotCompleted, f.raiseException)) 
     92        # Now callback the last 5, get them and compare. 
    10093        for did in dDict.keys()[5:10]: 
    10194            dDict[did].callback('foo') 
    102             d = pdm.get_pending_deferred(did,block=False) 
     95            d = self.pdm.get_pending_deferred(did,block=False) 
    10396            d.addCallback(lambda r: self.assert_(r=='foo')) 
    104         for did in dDict.keys(): 
    105             d = pdm.get_pending_deferred(did,False) 
    106             d.addErrback(lambda f: self.assertRaises(error.InvalidDeferredID, f.raiseException)) 
    107      
    108     def testCallback(self): 
    109         foo = Foo() 
    110         pdm = TwoPhaseFoo(foo) 
    111         d = pdm.bam('bam', block=True) 
    112         d.addCallback(lambda r: self.assertEquals(r, "blahblah: bam extra1 extra2")) 
    113         d.addCallback(lambda r: pdm.bam('bam', block=False)) 
    114         d.addCallback(lambda did: pdm.get_pending_deferred(did, True)) 
    115         d.addCallback(lambda r: self.assertEquals(r, "blahblah: bam extra1 extra2")) 
    116         return d      
     97     
     98    def test_save_then_delete(self): 
     99        d = defer.Deferred() 
     100        did = self.pdm.save_pending_deferred(d) 
     101        self.assert_(self.pdm.quick_has_id(did)) 
     102        self.pdm.delete_pending_deferred(did) 
     103        self.assert_(not self.pdm.quick_has_id(did)) 
     104     
     105    def test_save_get_delete(self): 
     106        d = defer.Deferred() 
     107        did = self.pdm.save_pending_deferred(d) 
     108        d2 = self.pdm.get_pending_deferred(did,True) 
     109        d2.addErrback(lambda f: self.assertRaises(error.AbortedPendingDeferredError, f.raiseException)) 
     110        self.pdm.delete_pending_deferred(did) 
     111        return d2 
     112    
     113    def test_double_get(self): 
     114        d = defer.Deferred() 
     115        did = self.pdm.save_pending_deferred(d) 
     116        d2 = self.pdm.get_pending_deferred(did,True) 
     117        d3 = self.pdm.get_pending_deferred(did,True) 
     118        d3.addErrback(lambda f: self.assertRaises(error.InvalidDeferredID, f.raiseException)) 
     119     
     120    def test_get_after_callback(self): 
     121        d = defer.Deferred() 
     122        did = self.pdm.save_pending_deferred(d) 
     123        d.callback('foo') 
     124        d2 = self.pdm.get_pending_deferred(did,True) 
     125        d2.addCallback(lambda r: self.assertEquals(r,'foo')) 
     126        self.assert_(not self.pdm.quick_has_id(did)) 
     127 
     128    def test_get_before_callback(self): 
     129        d = defer.Deferred() 
     130        did = self.pdm.save_pending_deferred(d) 
     131        d2 = self.pdm.get_pending_deferred(did,True) 
     132        d.callback('foo') 
     133        d2.addCallback(lambda r: self.assertEquals(r,'foo')) 
     134        self.assert_(not self.pdm.quick_has_id(did)) 
     135        d = defer.Deferred() 
     136        did = self.pdm.save_pending_deferred(d) 
     137        d2 = self.pdm.get_pending_deferred(did,True) 
     138        d2.addCallback(lambda r: self.assertEquals(r,'foo')) 
     139        d.callback('foo') 
     140        self.assert_(not self.pdm.quick_has_id(did)) 
     141     
     142    def test_get_after_errback(self): 
     143        class MyError(Exception): 
     144            pass 
     145        d = defer.Deferred() 
     146        did = self.pdm.save_pending_deferred(d) 
     147        d.errback(failure.Failure(MyError('foo'))) 
     148        d2 = self.pdm.get_pending_deferred(did,True) 
     149        d2.addErrback(lambda f: self.assertRaises(MyError, f.raiseException)) 
     150        self.assert_(not self.pdm.quick_has_id(did)) 
     151 
     152    def test_get_before_errback(self): 
     153        class MyError(Exception): 
     154            pass 
     155        d = defer.Deferred() 
     156        did = self.pdm.save_pending_deferred(d) 
     157        d2 = self.pdm.get_pending_deferred(did,True) 
     158        d.errback(failure.Failure(MyError('foo'))) 
     159        d2.addErrback(lambda f: self.assertRaises(MyError, f.raiseException)) 
     160        self.assert_(not self.pdm.quick_has_id(did)) 
     161        d = defer.Deferred() 
     162        did = self.pdm.save_pending_deferred(d) 
     163        d2 = self.pdm.get_pending_deferred(did,True) 
     164        d2.addErrback(lambda f: self.assertRaises(MyError, f.raiseException)) 
     165        d.errback(failure.Failure(MyError('foo'))) 
     166        self.assert_(not self.pdm.quick_has_id(did)) 
    117167         
     168    def test_noresult_noblock(self): 
     169        d = defer.Deferred() 
     170        did = self.pdm.save_pending_deferred(d) 
     171        d2 = self.pdm.get_pending_deferred(did,False) 
     172        d2.addErrback(lambda f: self.assertRaises(error.ResultNotCompleted, f.raiseException)) 
     173 
     174    def test_with_callbacks(self): 
     175        d = defer.Deferred() 
     176        d.addCallback(lambda r: r+' foo') 
     177        d.addCallback(lambda r: r+' bar') 
     178        did = self.pdm.save_pending_deferred(d) 
     179        d2 = self.pdm.get_pending_deferred(did,True) 
     180        d.callback('bam') 
     181        d2.addCallback(lambda r: self.assertEquals(r,'bam foo bar')) 
     182     
     183    def test_with_errbacks(self): 
     184        class MyError(Exception): 
     185            pass 
     186        d = defer.Deferred() 
     187        d.addCallback(lambda r: 'foo') 
     188        d.addErrback(lambda f: 'caught error') 
     189        did = self.pdm.save_pending_deferred(d) 
     190        d2 = self.pdm.get_pending_deferred(did,True) 
     191        d.errback(failure.Failure(MyError('bam'))) 
     192        d2.addErrback(lambda f: self.assertRaises(MyError, f.raiseException)) 
     193     
     194    def test_nested_deferreds(self): 
     195        d = defer.Deferred() 
     196        d2 = defer.Deferred() 
     197        d.addCallback(lambda r: d2) 
     198        did = self.pdm.save_pending_deferred(d) 
     199        d.callback('foo') 
     200        d3 = self.pdm.get_pending_deferred(did,False) 
     201        d3.addErrback(lambda f: self.assertRaises(error.ResultNotCompleted, f.raiseException)) 
     202        d2.callback('bar') 
     203        d3 = self.pdm.get_pending_deferred(did,False) 
     204        d3.addCallback(lambda r: self.assertEquals(r,'bar')) 
     205 
    118206#------------------------------------------------------------------------------- 
    119207# Regular Unittests