Changeset 3021

Show
Ignore:
Timestamp:
02/05/08 22:16:56 (10 months ago)
Author:
bgranger
Message:

Merging changes from ipython1-data-r3017 branch

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • ipython1/trunk/docs/ChangeLog

    r2995 r3021  
     12008-02-05  Brian Granger <ellisonbg@gmail.com> 
     2 
     3    * ipython1.kernel.enginepb/multienginexmlrpc: Simplified certain  
     4    interfaces so that they are simply a pass.  This was done for 
     5    interfaces that 1) are not part of the public API and 2) 
     6    only have one implementation (and always will). 
     7 
     8    * ipython1.kernel: Created new pushFunction/pullFunctions in the API 
     9    of the engines.  These functions now have all the needed logic for 
     10    pushing/pulling functions.  The logic has been removed from regular 
     11    push/pull to make things more robust. 
     12     
     13    * ipython1.kernel.multiengine and friends:  Removed the  
     14    IMultiEngineCoordinator interface (gather/scatter) entirely from the 
     15    controller.  These things are now handled by the client using push/pull. 
     16    This was done to make the load on the controller lighter. 
     17 
    1182008-01-28  Brian Granger <ellisonbg@gmail.com> 
    219 
  • ipython1/trunk/docs/examples/rmt.ipy

    r2624 r3021  
    3434    rc.pushAll(num_per_engine=num_per_engine, N=N) 
    3535    rc.executeAll('diffs = ensembleDiffs(num_per_engine, N)') 
    36     pd = rc.gatherAll('diffs') 
    37     return pd.getResult(block=True
     36    # gather blocks always for now 
     37    return rc.gatherAll('diffs'
    3838 
    3939 
  • ipython1/trunk/ipython1/core/interpreter.py

    r2933 r3021  
    437437        ---------- 
    438438        **kwds 
    439         """           
    440          
     439        """ 
     440 
     441        self.user_ns.update(kwds) 
     442 
     443    def pushFunction(self, **kwds): 
    441444        # First set the func_globals for all functions to self.user_ns 
     445        new_kwds = {} 
    442446        for k, v in kwds.iteritems(): 
    443             if isinstance(v, FunctionType): 
    444                 kwds[k] = FunctionType(v.func_code, self.user_ns
    445  
    446         self.user_ns.update(kwds) 
     447            if not isinstance(v, FunctionType): 
     448                raise TypeError("function object expected"
     449            new_kwds[k] = FunctionType(v.func_code, self.user_ns) 
     450        self.user_ns.update(new_kwds)         
    447451 
    448452    def pack_exception(self,message,exc): 
    449453        message['exception'] = exc.__class__ 
    450454        message['exception_value'] = \ 
    451         traceback.format_exception_only(exc.__class__, exc) 
     455            traceback.format_exception_only(exc.__class__, exc) 
    452456 
    453457    def feed_block(self, source, filename='<input>', symbol='single'): 
     
    530534            return result 
    531535 
     536    def pullFunction(self, key): 
     537        return self.pull(key) 
    532538 
    533539    #### Interactive user API ################################################## 
  • ipython1/trunk/ipython1/kernel/enginepb.py

    r2839 r3021  
    5151from ipython1.kernel.pbutil import packageFailure, unpackageFailure, checkMessageSize 
    5252from ipython1.kernel.pbconfig import CHUNK_SIZE 
    53 from ipython1.kernel.util import gatherBoth 
     53from ipython1.kernel.util import printer 
     54from ipython1.kernel.twistedutil import gatherBoth 
    5455from ipython1.kernel import newserialized 
    5556from ipython1.kernel.error import PBMessageSizeError, ProtocolError 
    56 from ipython1.kernel import controllerservice, protocols 
     57from ipython1.kernel import controllerservice 
    5758from ipython1.kernel.controllerservice import IControllerBase 
    5859from ipython1.kernel.engineservice import \ 
     
    212213    will be returned instead. 
    213214    """ 
    214          
    215     def remote_getID(): 
    216         """return this.id""" 
    217      
    218     def remote_setID(id): 
    219         """set this.id""" 
    220      
    221     def remote_execute(lines): 
    222         """Execute lines of Python code. 
    223          
    224         Returns a deferred to a result dict. 
    225          
    226         Upon failure returns a pickled Failure. 
    227         """ 
    228      
    229     def remote_push(self, pNamespace): 
    230         """Push a namespace into the users namespace. 
    231          
    232         pNamespace is a pickled dict of key, object pairs.  
    233         """ 
    234      
    235     def remote_pull(*keys): 
    236         """Pull objects from a users namespace by keys. 
    237          
    238         Returns a deferred to a pickled tuple of objects.  If any single 
    239         key has a problem, the Failure of that will be returned. 
    240         """ 
    241      
    242     def remote_getResult(i=None): 
    243         """Get result i. 
    244          
    245         Returns a deferred to a pickled result dict. 
    246         """ 
    247      
    248     def remote_reset(): 
    249         """Reset the Engine.""" 
    250      
    251     def remote_kill(): 
    252         """Stop the Engines reactor.""" 
    253      
    254     def remote_keys(): 
    255         """Get variable names that are currently defined in the user's namespace. 
    256          
    257         Returns a deferred to a tuple of keys. 
    258         """ 
    259      
    260     def remote_pushSerialized(pNamespace): 
    261         """Push a dict of keys and serialized objects into users namespace. 
    262          
    263         @arg pNamespace: a pickle namespace of keys and serialized objects. 
    264         """ 
    265      
    266     def remote_pullSerialized(*keys): 
    267         """Pull objects from users namespace by key as Serialized. 
    268          
    269         Returns a deferred to a pickled dict of key, Serialized pairs. 
    270         """ 
    271      
    272     def remote_setProperties(pNamespace): 
    273         """update the properties for this engine""" 
    274      
    275     def remote_getProperties(*keys): 
    276         """pull a subdict of the properties for this engine by keys""" 
    277      
    278     def remote_hasProperties(*keys): 
    279         """check for keys in the properties for this engine""" 
    280      
    281     def remote_delProperties(*keys): 
    282         """remove values of the properties for this engine by keys""" 
    283      
    284     def remote_clearProperties(): 
    285         """clear the properties for this engine""" 
    286      
    287      
    288  
     215    pass 
     216     
    289217 
    290218class PBEngineReferenceFromService(pb.Referenceable, object): 
     
    316244     
    317245    def remote_execute(self, lines): 
     246        """Execute lines of Python code. 
     247         
     248        Returns a deferred to a result dict. 
     249         
     250        Upon failure returns a pickled Failure. 
     251        """ 
    318252        d = self.service.execute(lines) 
    319253        d.addErrback(packageFailure) 
     
    322256        #d.addCallback(lambda r: log.msg("Got result: " + str(r))) 
    323257        return d 
    324      
    325     def remote_setProperties(self, pNamespace): 
     258         
     259    #--------------------------------------------------------------------------- 
     260    # Old version of push 
     261    #--------------------------------------------------------------------------- 
     262         
     263    def remote_push(self, pNamespace): 
     264        """Push a namespace into the users namespace. 
     265         
     266        pNamespace is a pickled dict of key, object pairs.  
     267        """ 
    326268        try: 
    327269            namespace = pickle.loads(pNamespace) 
     
    329271            return defer.fail(failure.Failure()).addErrback(packageFailure) 
    330272        else: 
    331             return self.service.setProperties(**namespace).addErrback(packageFailure) 
    332      
    333     def remote_getProperties(self, *keys): 
    334         d = self.service.getProperties(*keys) 
    335         d.addCallback(pickle.dumps, 2) 
    336         d.addCallback(checkMessageSize, repr(keys)) 
    337         d.addErrback(packageFailure) 
    338         return d 
    339      
    340     def remote_hasProperties(self, *keys): 
    341         d = self.service.hasProperties(*keys) 
    342         d.addCallback(pickle.dumps, 2) 
    343         d.addCallback(checkMessageSize, repr(keys)) 
    344         d.addErrback(packageFailure) 
    345         return d 
    346      
    347     def remote_delProperties(self, *keys): 
    348         d = self.service.delProperties(*keys) 
    349         # d.addCallback(pickle.dumps, 2) 
    350         # d.addCallback(checkMessageSize, repr(keys)) 
    351         d.addErrback(packageFailure) 
    352         return d 
    353      
    354     def remote_clearProperties(self): 
    355         d = self.service.clearProperties() 
    356         # d.addCallback(pickle.dumps, 2) 
    357         # d.addCallback(checkMessageSize, repr(keys)) 
    358         d.addErrback(packageFailure) 
    359         return d 
    360      
    361      
    362     #--------------------------------------------------------------------------- 
    363     # Old version of push 
    364     #--------------------------------------------------------------------------- 
    365          
    366     def remote_push(self, pNamespace): 
    367         try: 
    368             namespace = pickle.loads(pNamespace) 
    369         except: 
    370             return defer.fail(failure.Failure()).addErrback(packageFailure) 
    371         else: 
    372             # The usage of globals() here is an attempt to bind any pickled functions 
    373             # to the globals of this module.  What we really want is to have it bound 
    374             # to the globals of the callers module.  This will require walking the  
    375             # stack.  BG 10/3/07. 
    376             namespace = uncanDict(namespace, globals()) 
    377273            return self.service.push(**namespace).addErrback(packageFailure) 
    378274     
     
    417313    # pull 
    418314    #---------------------------------------------------------------------------      
    419          
     315                 
    420316    def remote_pull(self, *keys): 
     317        """Pull objects from a users namespace by keys. 
     318         
     319        Returns a deferred to a pickled tuple of objects.  If any single 
     320        key has a problem, the Failure of that will be returned. 
     321        """ 
    421322        d = self.service.pull(*keys) 
     323        d.addCallback(pickle.dumps, 2) 
     324        d.addCallback(checkMessageSize, repr(keys)) 
     325        d.addErrback(packageFailure) 
     326        return d 
     327     
     328    # NOTE:  The paging version of pull is not being used right now  (BG 1/15/07). 
     329     
     330    def remote_pullPaging(self, key, collector): 
     331        d = self.service.pull(key) 
     332        d.addCallback(newserialized.serialize) 
     333        d.addCallbacks(self._startPaging, packageFailure, callbackArgs=(collector,)) 
     334        return d 
     335     
     336    def _startPaging(self, serial, collector): 
     337        pager = SerializedPager(collector, serial, chunkSize=CHUNK_SIZE) 
     338 
     339    #--------------------------------------------------------------------------- 
     340    # push/pullFuction 
     341    #--------------------------------------------------------------------------- 
     342     
     343    def remote_pushFunction(self, pNamespace): 
     344        try: 
     345            namespace = pickle.loads(pNamespace) 
     346        except: 
     347            return defer.fail(failure.Failure()).addErrback(packageFailure) 
     348        else: 
     349            # The usage of globals() here is an attempt to bind any pickled functions 
     350            # to the globals of this module.  What we really want is to have it bound 
     351            # to the globals of the callers module.  This will require walking the  
     352            # stack.  BG 10/3/07. 
     353            namespace = uncanDict(namespace, globals()) 
     354            return self.service.pushFunction(**namespace).addErrback(packageFailure) 
     355     
     356    def remote_pullFunction(self, *keys): 
     357        d = self.service.pullFunction(*keys) 
    422358        if len(keys)>1: 
    423359            d.addCallback(canSequence) 
     
    428364        d.addErrback(packageFailure) 
    429365        return d 
    430      
    431     # NOTE:  The paging version of pull is not being used right now  (BG 1/15/07). 
    432      
    433     def remote_pullPaging(self, key, collector): 
    434         d = self.service.pull(key) 
    435         d.addCallback(newserialized.serialize) 
    436         d.addCallbacks(self._startPaging, packageFailure, callbackArgs=(collector,)) 
    437         return d 
    438      
    439     def _startPaging(self, serial, collector): 
    440         pager = SerializedPager(collector, serial, chunkSize=CHUNK_SIZE) 
    441      
     366 
    442367    #--------------------------------------------------------------------------- 
    443368    # Other methods 
     
    445370     
    446371    def remote_getResult(self, i=None): 
     372        """Get result i. 
     373         
     374        Returns a deferred to a pickled result dict. 
     375        """ 
    447376        return self.service.getResult(i).addErrback(packageFailure) 
    448377     
    449378    def remote_reset(self): 
     379        """Reset the Engine.""" 
    450380        return self.service.reset().addErrback(packageFailure) 
    451381     
    452382    def remote_kill(self): 
     383        """Stop the Engines reactor.""" 
    453384        return self.service.kill().addErrback(packageFailure) 
    454385     
    455386    def remote_keys(self): 
     387        """Get variable names that are currently defined in the user's namespace. 
     388         
     389        Returns a deferred to a tuple of keys. 
     390        """ 
    456391        return self.service.keys().addErrback(packageFailure) 
    457392     
     
    461396     
    462397    def remote_pushSerialized(self, pNamespace): 
     398        """Push a dict of keys and serialized objects into users namespace. 
     399         
     400        @arg pNamespace: a pickle namespace of keys and serialized objects. 
     401        """ 
    463402        try: 
    464403            namespace = pickle.loads(pNamespace) 
     
    470409     
    471410    def remote_pullSerialized(self, *keys): 
     411        """Pull objects from users namespace by key as Serialized. 
     412         
     413        Returns a deferred to a pickled dict of key, Serialized pairs. 
     414        """ 
    472415        d = self.service.pullSerialized(*keys) 
    473416        d.addCallback(pickle.dumps, 2) 
    474417        d.addCallback(checkMessageSize, repr(keys)) 
     418        d.addErrback(packageFailure) 
     419        return d 
     420     
     421    #--------------------------------------------------------------------------- 
     422    # Properties interface 
     423    #--------------------------------------------------------------------------- 
     424     
     425    def remote_setProperties(self, pNamespace): 
     426        """Update the properties for this engine""" 
     427        try: 
     428            namespace = pickle.loads(pNamespace) 
     429        except: 
     430            return defer.fail(failure.Failure()).addErrback(packageFailure) 
     431        else: 
     432            return self.service.setProperties(**namespace).addErrback(packageFailure) 
     433     
     434    def remote_getProperties(self, *keys): 
     435        """Pull a subdict of the properties for this engine by keys""" 
     436        d = self.service.getProperties(*keys) 
     437        d.addCallback(pickle.dumps, 2) 
     438        d.addCallback(checkMessageSize, repr(keys)) 
     439        d.addErrback(packageFailure) 
     440        return d 
     441     
     442    def remote_hasProperties(self, *keys): 
     443        """Check for keys in the properties for this engine.""" 
     444        d = self.service.hasProperties(*keys) 
     445        d.addCallback(pickle.dumps, 2) 
     446        d.addCallback(checkMessageSize, repr(keys)) 
     447        d.addErrback(packageFailure) 
     448        return d 
     449     
     450    def remote_delProperties(self, *keys): 
     451        """Remove values of the properties for this engine by keys.""" 
     452        d = self.service.delProperties(*keys) 
     453        # d.addCallback(pickle.dumps, 2) 
     454        # d.addCallback(checkMessageSize, repr(keys)) 
     455        d.addErrback(packageFailure) 
     456        return d 
     457     
     458    def remote_clearProperties(self): 
     459        """Clear the properties for this engine.""" 
     460        d = self.service.clearProperties() 
     461        # d.addCallback(pickle.dumps, 2) 
     462        # d.addCallback(checkMessageSize, repr(keys)) 
    475463        d.addErrback(packageFailure) 
    476464        return d 
     
    571559    def pushOld(self, **namespace): 
    572560        try: 
    573             package = pickle.dumps(canDict(namespace), 2) 
     561            package = pickle.dumps(namespace, 2) 
    574562        except: 
    575563            return defer.fail(failure.Failure()) 
     
    619607    # pull 
    620608    #--------------------------------------------------------------------------- 
    621      
     609         
    622610    def pullOld(self, *keys): 
    623611        d = self.callRemote('pull', *keys) 
    624612        d.addCallback(self.checkReturnForFailure) 
    625613        d.addCallback(pickle.loads) 
    626         # The usage of globals() here is an attempt to bind any pickled functions 
    627         # to the globals of this module.  What we really want is to have it bound 
    628         # to the globals of the callers module.  This will require walking the  
    629         # stack.  BG 10/3/07. 
    630         if len(keys)>1: 
    631             d.addCallback(uncanSequence, globals()) 
    632         elif len(keys)>0: 
    633             d.addCallback(uncan, globals()) 
    634614        return d 
    635615     
     
    658638     
    659639    pull = pullOld 
     640 
     641    #--------------------------------------------------------------------------- 
     642    # push/pullFunction 
     643    #--------------------------------------------------------------------------- 
     644     
     645    def pushFunction(self, **namespace): 
     646        try: 
     647            package = pickle.dumps(canDict(namespace), 2) 
     648        except: 
     649            return defer.fail(failure.Failure()) 
     650        else: 
     651            package = checkMessageSize(package, namespace.keys()) 
     652            if isinstance(package, failure.Failure): 
     653                return defer.fail(package) 
     654            else: 
     655                d = self.callRemote('pushFunction', package) 
     656                return d.addCallback(self.checkReturnForFailure)     
     657     
     658    def pullFunction(self, *keys): 
     659        d = self.callRemote('pullFunction', *keys) 
     660        d.addCallback(self.checkReturnForFailure) 
     661        d.addCallback(pickle.loads) 
     662        # The usage of globals() here is an attempt to bind any pickled functions 
     663        # to the globals of this module.  What we really want is to have it bound 
     664        # to the globals of the callers module.  This will require walking the  
     665        # stack.  BG 10/3/07. 
     666        if len(keys)==1: 
     667            d.addCallback(uncan, globals()) 
     668        elif len(keys)>1: 
     669            d.addCallback(uncanSequence, globals())             
     670        return d 
    660671     
    661672    #--------------------------------------------------------------------------- 
  • ipython1/trunk/ipython1/kernel/engineservice.py

    r2866 r3021  
    4545from ipython1.core.interpreter import Interpreter 
    4646from ipython1.kernel import newserialized, error, util 
    47 from ipython1.kernel.util import gatherBoth, DeferredList 
     47from ipython1.kernel.twistedutil import gatherBoth, DeferredList 
    4848from ipython1.kernel import codeutil 
    49 from ipython1.kernel.pickleutil import can, uncan 
    5049 
    5150 
     
    9392        """Pulls values out of the user's namespace by keys. 
    9493         
    95         Returns a deferred to tuple objects or a single object. 
    96          
    97         Raises NameError is any one of objects does not exist. 
     94        Returns a deferred to a tuple objects or a single object. 
     95         
     96        Raises NameError if any one of objects doess not exist. 
     97        """ 
     98     
     99    def pushFunction(**namespace): 
     100        """Push a dict of key, function pairs into the user's namespace. 
     101         
     102        Returns a deferred to None or a failure.""" 
     103     
     104    def pullFunction(*keys): 
     105        """Pulls functions out of the user's namespace by keys. 
     106         
     107        Returns a deferred to a tuple of functions or a single function. 
     108         
     109        Raises NameError if any one of the functions does not exist. 
    98110        """ 
    99111     
     
    205217 
    206218 
    207  
    208  
    209219#------------------------------------------------------------------------------- 
    210220# Functions and classes to implement the EngineService 
    211221#------------------------------------------------------------------------------- 
     222 
     223 
    212224class StrictDict(dict): 
    213225    """This is a strict copying dictionary for use as the interface to the  
     
    403415method: pull(*keys) 
    404416keys = %r""" % (self.id, keys) 
    405         if len(keys) > 1: 
     417         
     418        if len(keys)==1: 
     419            return self.executeAndRaise(msg, self.shell.pull, keys[0]) 
     420        elif len(keys) > 1: 
    406421            pulledDeferreds = [] 
    407422            for key in keys: 
     
    414429                           consumeErrors=1) 
    415430            return dTotal 
    416         elif len(keys) > 0: 
    417             return self.executeAndRaise(msg, self.shell.pull, keys[0]) 
    418431        else: 
    419432            return self.executeAndRaise(msg, self.shell.pull, None) 
     433     
     434    def pushFunction(self, **namespace): 
     435        msg = """engine: %r 
     436method: pushFunction(**namespace) 
     437namespace.keys() = %r""" % (self.id, namespace.keys()) 
     438        d = self.executeAndRaise(msg, self.shell.pushFunction, **namespace) 
     439        return d 
     440     
     441    def pullFunction(self, *keys): 
     442        msg = """engine %r 
     443method: pullFunction(*keys) 
     444keys = %r""" % (self.id, keys) 
     445        if len(keys)==1: 
     446            return self.executeAndRaise(msg, self.shell.pullFunction, keys[0]) 
     447        elif len(keys) > 1: 
     448            pulledDeferreds = [] 
     449            for key in keys: 
     450                d = self.executeAndRaise(msg, self.shell.pullFunction, key) 
     451                pulledDeferreds.append(d) 
     452            # This will fire on the first failure and log the rest. 
     453            dTotal = gatherBoth(pulledDeferreds,  
     454                           fireOnOneErrback=1, 
     455                           logErrors=1,  
     456                           consumeErrors=1) 
     457            return dTotal 
     458        else: 
     459            return self.executeAndRaise(msg, self.shell.pullFunction, None)         
    420460     
    421461    def getResult(self, i=None): 
     
    507547            try: 
    508548                unserialized = newserialized.IUnSerialized(v) 
    509                 # The usage of globals() here is an attempt to bind any pickled functions 
    510                 # to the globals of this module.  What we really want is to have it bound 
    511                 # to the globals of the callers module.  This will require walking the  
    512                 # stack.  BG 10/3/07. 
    513                 ns[k] = uncan(unserialized.getObject(),globals()) 
     549                ns[k] = unserialized.getObject() 
    514550            except: 
    515551                return defer.fail() 
     
    520556method: pullSerialized(*keys) 
    521557keys = %r""" % (self.id, keys) 
    522         if len(keys) > 1: 
     558        if len(keys)==1: 
     559            key = keys[0] 
     560            d = self.executeAndRaise(msg, self.shell.pull, key) 
     561            d.addCallback(newserialized.serialize) 
     562            return d 
     563        elif len(keys)>1:             
    523564            pulledDeferreds = [] 
    524565            for key in keys: 
    525                 d = self.executeAndRaise(msg, self.shell.pull,key) 
     566                d = self.executeAndRaise(msg, self.shell.pull, key) 
    526567                pulledDeferreds.append(d) 
    527568            # This will fire on the first failure and log the rest. 
     
    535576                for v in values: 
    536577                    try: 
    537                         serials = newserialized.serialize(v
     578                        serials.append(newserialized.serialize(v)
    538579                    except: 
    539580                        return defer.fail(failure.Failure()) 
    540                 return dict(zip(keys, values)) 
     581                return dict(zip(keys, serials)) 
    541582            return packThemUp 
    542         else: 
    543             key = keys[0] 
    544             d = self.executeAndRaise(msg, self.shell.pull, key) 
    545             d.addCallback(newserialized.serialize) 
    546             return d 
    547      
    548      
     583 
     584 
    549585def queue(methodToQueue): 
    550586    def queuedMethod(this, *args, **kwargs): 
     
    685721        pass 
    686722         
     723    @queue 
     724    def pushFunction(self, **namespace): 
     725        pass       
     726     
     727    @queue 
     728    def pullFunction(self, *keys): 
     729        pass         
     730 
    687731    def getResult(self, i=None): 
    688732        if i is None: 
  • ipython1/trunk/ipython1/kernel/map.py

    r2199 r3021  
    8989            return genutil_flatten(listOfPartitions) 
    9090        # If we have scalars, just return listOfPartitions 
    91         return listOfPartitions     
     91        return listOfPartitions 
    9292 
    9393class RoundRobinMap(Map): 
  • ipython1/trunk/ipython1/kernel/multiengine.py

    r2837 r3021  
    3333from zope.interface import Interface, implements, Attribute 
    3434 
    35 from ipython1.kernel.util import gatherBoth 
    36 from ipython1.kernel import map as Map 
     35from ipython1.kernel.util import printer 
     36from ipython1.kernel.twistedutil import gatherBoth 
    3737from ipython1.kernel import error 
    3838from ipython1.kernel.controllerservice import \ 
     
    126126    def pullAll(*keys): 
    127127        """Pull from all targets.""" 
     128           
     129    def pushFunction(targets, **namespace): 
     130        """""" 
     131         
     132    def pushFunctionAll(**namespace): 
     133        """""" 
     134         
     135    def pullFunction(targets, *keys): 
     136        """""" 
     137         
     138    def pullFunctionAll(*keys): 
     139        """""" 
    128140                 
    129141    def getResult(targets, i=None): 
     
    210222        """get all the properties dicts.""" 
    211223     
    212      
    213 class IEngineCoordinator(Interface): 
    214     """Methods that work on multiple engines explicitly.""" 
    215          
    216     #--------------------------------------------------------------------------- 
    217     # Coordinated methods 
    218     #--------------------------------------------------------------------------- 
    219           
    220     def scatter(targets, key, seq, style='basic', flatten=False): 
    221         """Partition and distribute a sequence to targets. 
    222          
    223         :Parameters: 
    224             key : str 
    225                 The variable name to call the scattered sequence. 
    226             seq : list, tuple, array 
    227                 The sequence to scatter.  The type should be preserved. 
    228             style : string 
    229                 A specification of how the sequence is partitioned.  Currently  
    230                 only 'basic' is implemented. 
    231             flatten : boolean 
    232                 Should single element sequences be converted to scalars. 
    233         """ 
    234      
    235     def scatterAll(key, seq, style='basic', flatten=False): 
    236         """Scatter to all targets.""" 
    237      
    238     def gather(targets, key, style='basic'): 
    239         """Gather object key from targets. 
    240  
    241         :Parameters: 
    242             key : string 
    243                 The name of a sequence on the targets to gather. 
    244             style : string 
    245                 A specification of how the sequence is partitioned.  Currently  
    246                 only 'basic' is implemented.                 
    247         """ 
    248      
    249     def gatherAll(key, style='basic'): 
    250         """Gather from all targets.""" 
    251          
    252          
    253 class IMultiEngine(IEngineMultiplexer,  
    254                    IEngineCoordinator): 
     224 
     225class IMultiEngine(IEngineMultiplexer): 
    255226    """A controller that exposes an explicit interface to all of its engines. 
    256227     
     
    400371        return self.pull('all', *keys) 
    401372     
     373    def pushFunction(self, targets, **ns): 
     374        return self._performOnEnginesAndGatherBoth('pushFunction', targets, **ns) 
     375         
     376    def pushFunctionAll(self, **ns): 
     377        return self.pushFunction('all', **ns) 
     378         
     379    def pullFunction(self, targets, *keys): 
     380        return self._performOnEnginesAndGatherBoth('pullFunction', targets, *keys) 
     381     
     382    def pullFunctionAll(self, *keys): 
     383        return self.pullFunction('all', *keys) 
     384     
    402385    def getResult(self, targets, i=None): 
    403386        return self._performOnEnginesAndGatherBoth('getResult', targets, i) 
     
    569552        return self.getProperties('all') 
    570553     
    571     #--------------------------------------------------------------------------- 
    572     # IEngineCoordinator methods 
    573     #--------------------------------------------------------------------------- 
    574  
    575     def scatter(self, targets, key, seq, style='basic', flatten=False): 
    576         log.msg("Scattering %r to %r" % (key, targets)) 
    577         try: 
    578             engines = self.engineList(targets) 
    579         except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered): 
    580             return defer.fail(failure.Failure()) 
    581         else: 
    582             nEngines = len(engines)     
    583             mapClass = Map.styles[style] 
    584             mapObject = mapClass() 
    585             dList = [] 
    586             for index, engine in enumerate(engines): 
    587                 partition = mapObject.getPartition(seq, index, nEngines) 
    588                 if flatten and len(partition) == 1:     
    589                     dList.append(engine.push(**{key: partition[0]})) 
    590                 else: 
    591                     dList.append(engine.push(**{key: partition})) 
    592             return gatherBoth(dList,  
    593                               fireOnOneErrback=1, 
    594                               consumeErrors=1, 
    595                               logErrors=0)   
    596                                
    597     def scatterAll(self, key, seq, style='basic', flatten=False): 
    598         return self.scatter('all', key, seq, style, flatten) 
    599      
    600     def gather(self, targets, key, style='basic'): 
    601         """gather a distributed object, and reassemble it""" 
    602         log.msg("Gathering %s from %r" % (key, targets)) 
    603         try: 
    604              engines = self.engineList(targets) 
    605         except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered): 
    606             return defer.fail(failure.Failure()) 
    607         else: 
    608             nEngines = len(engines)     
    609             dList = [] 
    610             for e in engines: 
    611                 dList.append(e.pull(key))     
    612             mapClass = Map.styles[style] 
    613             mapObject = mapClass() 
    614             d = gatherBoth(dList,  
    615                            fireOnOneErrback=1, 
    616                            consumeErrors=1, 
    617                            logErrors=0)   
    618             return d.addCallback(mapObject.joinPartitions) 
    619      
    620     def gatherAll(self, key, style='basic'): 
    621         return self.gather('all', key, style) 
    622  
    623554 
    624555components.registerAdapter(MultiEngine,  
     
    674605    @twoPhase 
    675606    def pull(self, targets, *keys): 
    676         return self.multiengine.pull(targets, *keys) 
    677      
     607        d = self.multiengine.pull(targets, *keys) 
     608        return d 
     609 
     610    @twoPhase 
     611    def pushFunction(self, targets, **namespace): 
     612        return self.multiengine.pushFunction(targets, **namespace) 
     613     
     614    @twoPhase 
     615    def pullFunction(self, targets, *keys): 
     616        d = self.multiengine.pullFunction(targets, *keys) 
     617        return d 
     618 
    678619    @twoPhase 
    679620    def getResult(self, targets, i=None): 
     
    728669        return self.multiengine.clearProperties(targets, *keys) 
    729670     
    730     @twoPhase 
    731     def scatter(self, targets, key, seq, style='basic', flatten=False): 
    732         return self.multiengine.scatter(targets, key, seq, style, flatten) 
    733      
    734     @twoPhase 
    735     def gather(self, targets, key, style='basic'): 
    736         return self.multiengine.gather(targets, key, style) 
    737      
    738671    #--------------------------------------------------------------------------- 
    739672    # IMultiEngine methods 
  • ipython1/trunk/ipython1/kernel/multiengineclient.py

    r2839 r3021  
    3030from ipython1.kernel import error 
    3131from ipython1.kernel.parallelfunction import ParallelFunction 
     32from ipython1.kernel import map as Map 
    3233 
    3334 
     
    422423        if self.block: 
    423424            if isinstance(func, FunctionType): 
    424                 self.push(targets, _ipython_map_func=func) 
     425                self.pushFunction(targets, _ipython_map_func=func) 
    425426                sourceToRun = '_ipython_map_seq_result = map(_ipython_map_func, _ipython_map_seq)' 
    426427            elif isinstance(func, str): 
     
    435436        else: 
    436437            if isinstance(func, FunctionType): 
    437                 pd1 = self.push(targets, _ipython_map_func=func) 
     438                pd1 = self.pushFunction(targets, _ipython_map_func=func) 
    438439                sourceToRun = '_ipython_map_seq_result = map(_ipython_map_func, _ipython_map_seq)' 
    439440            elif isinstance(func, str): 
     
    489490        """ 
    490491        return self.parallelize('all', functionName) 
     492 
     493 
     494#------------------------------------------------------------------------------- 
     495# MultiEngineCoordinator 
     496#------------------------------------------------------------------------------- 
     497 
     498class MultiEngineCoordinator(object): 
     499     
     500    def scatter(self, targets, key, seq, style='basic', flatten=False): 
     501        """Partition and distribute a sequence to a set of targets/engines. 
     502         
     503        This method partitions a Python sequence and then pushes the partitions 
     504        to a set of engines. 
     505         
     506        :Parameters: 
     507            targets : int, list or 'all' 
     508                The engine ids the action will apply to.  Call `getIDs` to see 
     509                a list of currently available engines. 
     510            key : str 
     511                What to call the partitions on the engines. 
     512            seq : list, tuple or numpy array 
     513                The sequence to be partitioned and pushed. 
     514            style : str 
     515                The style of partitioning to use.  Only 'basic' is supported 
     516            flatten : boolean 
     517                Should length 1 partitions be flattened to scalars upon pushing. 
     518        """ 
     519        self._checkClientID() 
     520        localBlock = self._reallyBlock() 
     521        if targets=='all': 
     522            engines = self.getIDs() 
     523        else: 
     524            engines = targets 
     525        nEngines = len(engines)     
     526        mapClass = Map.styles[style] 
     527        mapObject = mapClass() 
     528        results = [] 
     529        for index, engine in enumerate(engines): 
     530            partition = mapObject.getPartition(seq, index, nEngines) 
     531            if flatten and len(partition) == 1:     
     532                results.append(self.push(engine, **{key: partition[0]})) 
     533            else: 
     534                results.append(self.push(engine, **{key: partition}))            
     535        if not localBlock: 
     536            self.barrier(*results) 
     537        return nEngines*[None] 
     538         
     539    def scatterAll(self, key, seq, style='basic', flatten=False): 
     540        """Partition and distribute a sequence to all targets/engines. 
     541         
     542        See the docstring for `scatter` for full details. 
     543        """ 
     544        return self.scatter('all', key, seq, style, flatten) 
     545     
     546    scatter_all = scatterAll 
     547     
     548    def gather(self, targets, key, style='basic'): 
     549        """Gather a set of sequence partitions that are distributed on targets. 
     550         
     551        This method is the inverse of `scatter`