Changeset 3021
- Timestamp:
- 02/05/08 22:16:56 (10 months ago)
- Files:
-
- ipython1/trunk/docs/ChangeLog (modified) (1 diff)
- ipython1/trunk/docs/examples/rmt.ipy (modified) (1 diff)
- ipython1/trunk/ipython1/core/interpreter.py (modified) (2 diffs)
- ipython1/trunk/ipython1/kernel/enginepb.py (modified) (13 diffs)
- ipython1/trunk/ipython1/kernel/engineservice.py (modified) (9 diffs)
- ipython1/trunk/ipython1/kernel/map.py (modified) (1 diff)
- ipython1/trunk/ipython1/kernel/multiengine.py (modified) (7 diffs)
- ipython1/trunk/ipython1/kernel/multiengineclient.py (modified) (4 diffs)
- ipython1/trunk/ipython1/kernel/multienginehttp.py (modified) (4 diffs)
- ipython1/trunk/ipython1/kernel/multienginexmlrpc.py (modified) (13 diffs)
- ipython1/trunk/ipython1/kernel/pbutil.py (modified) (2 diffs)
- ipython1/trunk/ipython1/kernel/pendingdeferred.py (modified) (1 diff)
- ipython1/trunk/ipython1/kernel/pickleutil.py (modified) (1 diff)
- ipython1/trunk/ipython1/kernel/task.py (modified) (1 diff)
- ipython1/trunk/ipython1/kernel/tests/engineservicetest.py (modified) (4 diffs)
- ipython1/trunk/ipython1/kernel/tests/multienginetest.py (modified) (3 diffs)
- ipython1/trunk/ipython1/kernel/tests/test_multiengine.py (modified) (1 diff)
- ipython1/trunk/ipython1/kernel/twistedutil.py (copied) (copied from ipython1/branches/ipython1-data-r3016/ipython1/kernel/twistedutil.py)
- ipython1/trunk/ipython1/kernel/util.py (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
ipython1/trunk/docs/ChangeLog
r2995 r3021 1 2008-02-05 Brian Granger <ellisonbg@gmail.com> 2 3 * ipython1.kernel.enginepb/multienginexmlrpc: Simplified certain 4 interfaces so that they are simply a pass. This was done for 5 interfaces that 1) are not part of the public API and 2) 6 only have one implementation (and always will). 7 8 * ipython1.kernel: Created new pushFunction/pullFunctions in the API 9 of the engines. These functions now have all the needed logic for 10 pushing/pulling functions. The logic has been removed from regular 11 push/pull to make things more robust. 12 13 * ipython1.kernel.multiengine and friends: Removed the 14 IMultiEngineCoordinator interface (gather/scatter) entirely from the 15 controller. These things are now handled by the client using push/pull. 16 This was done to make the load on the controller lighter. 17 1 18 2008-01-28 Brian Granger <ellisonbg@gmail.com> 2 19 ipython1/trunk/docs/examples/rmt.ipy
r2624 r3021 34 34 rc.pushAll(num_per_engine=num_per_engine, N=N) 35 35 rc.executeAll('diffs = ensembleDiffs(num_per_engine, N)') 36 pd = rc.gatherAll('diffs') 37 return pd.getResult(block=True)36 # gather blocks always for now 37 return rc.gatherAll('diffs') 38 38 39 39 ipython1/trunk/ipython1/core/interpreter.py
r2933 r3021 437 437 ---------- 438 438 **kwds 439 """ 440 439 """ 440 441 self.user_ns.update(kwds) 442 443 def pushFunction(self, **kwds): 441 444 # First set the func_globals for all functions to self.user_ns 445 new_kwds = {} 442 446 for k, v in kwds.iteritems(): 443 if isinstance(v, FunctionType):444 kwds[k] = FunctionType(v.func_code, self.user_ns)445 446 self.user_ns.update( kwds)447 if not isinstance(v, FunctionType): 448 raise TypeError("function object expected") 449 new_kwds[k] = FunctionType(v.func_code, self.user_ns) 450 self.user_ns.update(new_kwds) 447 451 448 452 def pack_exception(self,message,exc): 449 453 message['exception'] = exc.__class__ 450 454 message['exception_value'] = \ 451 traceback.format_exception_only(exc.__class__, exc)455 traceback.format_exception_only(exc.__class__, exc) 452 456 453 457 def feed_block(self, source, filename='<input>', symbol='single'): … … 530 534 return result 531 535 536 def pullFunction(self, key): 537 return self.pull(key) 532 538 533 539 #### Interactive user API ################################################## ipython1/trunk/ipython1/kernel/enginepb.py
r2839 r3021 51 51 from ipython1.kernel.pbutil import packageFailure, unpackageFailure, checkMessageSize 52 52 from ipython1.kernel.pbconfig import CHUNK_SIZE 53 from ipython1.kernel.util import gatherBoth 53 from ipython1.kernel.util import printer 54 from ipython1.kernel.twistedutil import gatherBoth 54 55 from ipython1.kernel import newserialized 55 56 from ipython1.kernel.error import PBMessageSizeError, ProtocolError 56 from ipython1.kernel import controllerservice , protocols57 from ipython1.kernel import controllerservice 57 58 from ipython1.kernel.controllerservice import IControllerBase 58 59 from ipython1.kernel.engineservice import \ … … 212 213 will be returned instead. 213 214 """ 214 215 def remote_getID(): 216 """return this.id""" 217 218 def remote_setID(id): 219 """set this.id""" 220 221 def remote_execute(lines): 222 """Execute lines of Python code. 223 224 Returns a deferred to a result dict. 225 226 Upon failure returns a pickled Failure. 227 """ 228 229 def remote_push(self, pNamespace): 230 """Push a namespace into the users namespace. 231 232 pNamespace is a pickled dict of key, object pairs. 233 """ 234 235 def remote_pull(*keys): 236 """Pull objects from a users namespace by keys. 237 238 Returns a deferred to a pickled tuple of objects. If any single 239 key has a problem, the Failure of that will be returned. 240 """ 241 242 def remote_getResult(i=None): 243 """Get result i. 244 245 Returns a deferred to a pickled result dict. 246 """ 247 248 def remote_reset(): 249 """Reset the Engine.""" 250 251 def remote_kill(): 252 """Stop the Engines reactor.""" 253 254 def remote_keys(): 255 """Get variable names that are currently defined in the user's namespace. 256 257 Returns a deferred to a tuple of keys. 258 """ 259 260 def remote_pushSerialized(pNamespace): 261 """Push a dict of keys and serialized objects into users namespace. 262 263 @arg pNamespace: a pickle namespace of keys and serialized objects. 264 """ 265 266 def remote_pullSerialized(*keys): 267 """Pull objects from users namespace by key as Serialized. 268 269 Returns a deferred to a pickled dict of key, Serialized pairs. 270 """ 271 272 def remote_setProperties(pNamespace): 273 """update the properties for this engine""" 274 275 def remote_getProperties(*keys): 276 """pull a subdict of the properties for this engine by keys""" 277 278 def remote_hasProperties(*keys): 279 """check for keys in the properties for this engine""" 280 281 def remote_delProperties(*keys): 282 """remove values of the properties for this engine by keys""" 283 284 def remote_clearProperties(): 285 """clear the properties for this engine""" 286 287 288 215 pass 216 289 217 290 218 class PBEngineReferenceFromService(pb.Referenceable, object): … … 316 244 317 245 def remote_execute(self, lines): 246 """Execute lines of Python code. 247 248 Returns a deferred to a result dict. 249 250 Upon failure returns a pickled Failure. 251 """ 318 252 d = self.service.execute(lines) 319 253 d.addErrback(packageFailure) … … 322 256 #d.addCallback(lambda r: log.msg("Got result: " + str(r))) 323 257 return d 324 325 def remote_setProperties(self, pNamespace): 258 259 #--------------------------------------------------------------------------- 260 # Old version of push 261 #--------------------------------------------------------------------------- 262 263 def remote_push(self, pNamespace): 264 """Push a namespace into the users namespace. 265 266 pNamespace is a pickled dict of key, object pairs. 267 """ 326 268 try: 327 269 namespace = pickle.loads(pNamespace) … … 329 271 return defer.fail(failure.Failure()).addErrback(packageFailure) 330 272 else: 331 return self.service.setProperties(**namespace).addErrback(packageFailure)332 333 def remote_getProperties(self, *keys):334 d = self.service.getProperties(*keys)335 d.addCallback(pickle.dumps, 2)336 d.addCallback(checkMessageSize, repr(keys))337 d.addErrback(packageFailure)338 return d339 340 def remote_hasProperties(self, *keys):341 d = self.service.hasProperties(*keys)342 d.addCallback(pickle.dumps, 2)343 d.addCallback(checkMessageSize, repr(keys))344 d.addErrback(packageFailure)345 return d346 347 def remote_delProperties(self, *keys):348 d = self.service.delProperties(*keys)349 # d.addCallback(pickle.dumps, 2)350 # d.addCallback(checkMessageSize, repr(keys))351 d.addErrback(packageFailure)352 return d353 354 def remote_clearProperties(self):355 d = self.service.clearProperties()356 # d.addCallback(pickle.dumps, 2)357 # d.addCallback(checkMessageSize, repr(keys))358 d.addErrback(packageFailure)359 return d360 361 362 #---------------------------------------------------------------------------363 # Old version of push364 #---------------------------------------------------------------------------365 366 def remote_push(self, pNamespace):367 try:368 namespace = pickle.loads(pNamespace)369 except:370 return defer.fail(failure.Failure()).addErrback(packageFailure)371 else:372 # The usage of globals() here is an attempt to bind any pickled functions373 # to the globals of this module. What we really want is to have it bound374 # to the globals of the callers module. This will require walking the375 # stack. BG 10/3/07.376 namespace = uncanDict(namespace, globals())377 273 return self.service.push(**namespace).addErrback(packageFailure) 378 274 … … 417 313 # pull 418 314 #--------------------------------------------------------------------------- 419 315 420 316 def remote_pull(self, *keys): 317 """Pull objects from a users namespace by keys. 318 319 Returns a deferred to a pickled tuple of objects. If any single 320 key has a problem, the Failure of that will be returned. 321 """ 421 322 d = self.service.pull(*keys) 323 d.addCallback(pickle.dumps, 2) 324 d.addCallback(checkMessageSize, repr(keys)) 325 d.addErrback(packageFailure) 326 return d 327 328 # NOTE: The paging version of pull is not being used right now (BG 1/15/07). 329 330 def remote_pullPaging(self, key, collector): 331 d = self.service.pull(key) 332 d.addCallback(newserialized.serialize) 333 d.addCallbacks(self._startPaging, packageFailure, callbackArgs=(collector,)) 334 return d 335 336 def _startPaging(self, serial, collector): 337 pager = SerializedPager(collector, serial, chunkSize=CHUNK_SIZE) 338 339 #--------------------------------------------------------------------------- 340 # push/pullFuction 341 #--------------------------------------------------------------------------- 342 343 def remote_pushFunction(self, pNamespace): 344 try: 345 namespace = pickle.loads(pNamespace) 346 except: 347 return defer.fail(failure.Failure()).addErrback(packageFailure) 348 else: 349 # The usage of globals() here is an attempt to bind any pickled functions 350 # to the globals of this module. What we really want is to have it bound 351 # to the globals of the callers module. This will require walking the 352 # stack. BG 10/3/07. 353 namespace = uncanDict(namespace, globals()) 354 return self.service.pushFunction(**namespace).addErrback(packageFailure) 355 356 def remote_pullFunction(self, *keys): 357 d = self.service.pullFunction(*keys) 422 358 if len(keys)>1: 423 359 d.addCallback(canSequence) … … 428 364 d.addErrback(packageFailure) 429 365 return d 430 431 # NOTE: The paging version of pull is not being used right now (BG 1/15/07). 432 433 def remote_pullPaging(self, key, collector): 434 d = self.service.pull(key) 435 d.addCallback(newserialized.serialize) 436 d.addCallbacks(self._startPaging, packageFailure, callbackArgs=(collector,)) 437 return d 438 439 def _startPaging(self, serial, collector): 440 pager = SerializedPager(collector, serial, chunkSize=CHUNK_SIZE) 441 366 442 367 #--------------------------------------------------------------------------- 443 368 # Other methods … … 445 370 446 371 def remote_getResult(self, i=None): 372 """Get result i. 373 374 Returns a deferred to a pickled result dict. 375 """ 447 376 return self.service.getResult(i).addErrback(packageFailure) 448 377 449 378 def remote_reset(self): 379 """Reset the Engine.""" 450 380 return self.service.reset().addErrback(packageFailure) 451 381 452 382 def remote_kill(self): 383 """Stop the Engines reactor.""" 453 384 return self.service.kill().addErrback(packageFailure) 454 385 455 386 def remote_keys(self): 387 """Get variable names that are currently defined in the user's namespace. 388 389 Returns a deferred to a tuple of keys. 390 """ 456 391 return self.service.keys().addErrback(packageFailure) 457 392 … … 461 396 462 397 def remote_pushSerialized(self, pNamespace): 398 """Push a dict of keys and serialized objects into users namespace. 399 400 @arg pNamespace: a pickle namespace of keys and serialized objects. 401 """ 463 402 try: 464 403 namespace = pickle.loads(pNamespace) … … 470 409 471 410 def remote_pullSerialized(self, *keys): 411 """Pull objects from users namespace by key as Serialized. 412 413 Returns a deferred to a pickled dict of key, Serialized pairs. 414 """ 472 415 d = self.service.pullSerialized(*keys) 473 416 d.addCallback(pickle.dumps, 2) 474 417 d.addCallback(checkMessageSize, repr(keys)) 418 d.addErrback(packageFailure) 419 return d 420 421 #--------------------------------------------------------------------------- 422 # Properties interface 423 #--------------------------------------------------------------------------- 424 425 def remote_setProperties(self, pNamespace): 426 """Update the properties for this engine""" 427 try: 428 namespace = pickle.loads(pNamespace) 429 except: 430 return defer.fail(failure.Failure()).addErrback(packageFailure) 431 else: 432 return self.service.setProperties(**namespace).addErrback(packageFailure) 433 434 def remote_getProperties(self, *keys): 435 """Pull a subdict of the properties for this engine by keys""" 436 d = self.service.getProperties(*keys) 437 d.addCallback(pickle.dumps, 2) 438 d.addCallback(checkMessageSize, repr(keys)) 439 d.addErrback(packageFailure) 440 return d 441 442 def remote_hasProperties(self, *keys): 443 """Check for keys in the properties for this engine.""" 444 d = self.service.hasProperties(*keys) 445 d.addCallback(pickle.dumps, 2) 446 d.addCallback(checkMessageSize, repr(keys)) 447 d.addErrback(packageFailure) 448 return d 449 450 def remote_delProperties(self, *keys): 451 """Remove values of the properties for this engine by keys.""" 452 d = self.service.delProperties(*keys) 453 # d.addCallback(pickle.dumps, 2) 454 # d.addCallback(checkMessageSize, repr(keys)) 455 d.addErrback(packageFailure) 456 return d 457 458 def remote_clearProperties(self): 459 """Clear the properties for this engine.""" 460 d = self.service.clearProperties() 461 # d.addCallback(pickle.dumps, 2) 462 # d.addCallback(checkMessageSize, repr(keys)) 475 463 d.addErrback(packageFailure) 476 464 return d … … 571 559 def pushOld(self, **namespace): 572 560 try: 573 package = pickle.dumps( canDict(namespace), 2)561 package = pickle.dumps(namespace, 2) 574 562 except: 575 563 return defer.fail(failure.Failure()) … … 619 607 # pull 620 608 #--------------------------------------------------------------------------- 621 609 622 610 def pullOld(self, *keys): 623 611 d = self.callRemote('pull', *keys) 624 612 d.addCallback(self.checkReturnForFailure) 625 613 d.addCallback(pickle.loads) 626 # The usage of globals() here is an attempt to bind any pickled functions627 # to the globals of this module. What we really want is to have it bound628 # to the globals of the callers module. This will require walking the629 # stack. BG 10/3/07.630 if len(keys)>1:631 d.addCallback(uncanSequence, globals())632 elif len(keys)>0:633 d.addCallback(uncan, globals())634 614 return d 635 615 … … 658 638 659 639 pull = pullOld 640 641 #--------------------------------------------------------------------------- 642 # push/pullFunction 643 #--------------------------------------------------------------------------- 644 645 def pushFunction(self, **namespace): 646 try: 647 package = pickle.dumps(canDict(namespace), 2) 648 except: 649 return defer.fail(failure.Failure()) 650 else: 651 package = checkMessageSize(package, namespace.keys()) 652 if isinstance(package, failure.Failure): 653 return defer.fail(package) 654 else: 655 d = self.callRemote('pushFunction', package) 656 return d.addCallback(self.checkReturnForFailure) 657 658 def pullFunction(self, *keys): 659 d = self.callRemote('pullFunction', *keys) 660 d.addCallback(self.checkReturnForFailure) 661 d.addCallback(pickle.loads) 662 # The usage of globals() here is an attempt to bind any pickled functions 663 # to the globals of this module. What we really want is to have it bound 664 # to the globals of the callers module. This will require walking the 665 # stack. BG 10/3/07. 666 if len(keys)==1: 667 d.addCallback(uncan, globals()) 668 elif len(keys)>1: 669 d.addCallback(uncanSequence, globals()) 670 return d 660 671 661 672 #--------------------------------------------------------------------------- ipython1/trunk/ipython1/kernel/engineservice.py
r2866 r3021 45 45 from ipython1.core.interpreter import Interpreter 46 46 from ipython1.kernel import newserialized, error, util 47 from ipython1.kernel. util import gatherBoth, DeferredList47 from ipython1.kernel.twistedutil import gatherBoth, DeferredList 48 48 from ipython1.kernel import codeutil 49 from ipython1.kernel.pickleutil import can, uncan50 49 51 50 … … 93 92 """Pulls values out of the user's namespace by keys. 94 93 95 Returns a deferred to tuple objects or a single object. 96 97 Raises NameError is any one of objects does not exist. 94 Returns a deferred to a tuple objects or a single object. 95 96 Raises NameError if any one of objects doess not exist. 97 """ 98 99 def pushFunction(**namespace): 100 """Push a dict of key, function pairs into the user's namespace. 101 102 Returns a deferred to None or a failure.""" 103 104 def pullFunction(*keys): 105 """Pulls functions out of the user's namespace by keys. 106 107 Returns a deferred to a tuple of functions or a single function. 108 109 Raises NameError if any one of the functions does not exist. 98 110 """ 99 111 … … 205 217 206 218 207 208 209 219 #------------------------------------------------------------------------------- 210 220 # Functions and classes to implement the EngineService 211 221 #------------------------------------------------------------------------------- 222 223 212 224 class StrictDict(dict): 213 225 """This is a strict copying dictionary for use as the interface to the … … 403 415 method: pull(*keys) 404 416 keys = %r""" % (self.id, keys) 405 if len(keys) > 1: 417 418 if len(keys)==1: 419 return self.executeAndRaise(msg, self.shell.pull, keys[0]) 420 elif len(keys) > 1: 406 421 pulledDeferreds = [] 407 422 for key in keys: … … 414 429 consumeErrors=1) 415 430 return dTotal 416 elif len(keys) > 0:417 return self.executeAndRaise(msg, self.shell.pull, keys[0])418 431 else: 419 432 return self.executeAndRaise(msg, self.shell.pull, None) 433 434 def pushFunction(self, **namespace): 435 msg = """engine: %r 436 method: pushFunction(**namespace) 437 namespace.keys() = %r""" % (self.id, namespace.keys()) 438 d = self.executeAndRaise(msg, self.shell.pushFunction, **namespace) 439 return d 440 441 def pullFunction(self, *keys): 442 msg = """engine %r 443 method: pullFunction(*keys) 444 keys = %r""" % (self.id, keys) 445 if len(keys)==1: 446 return self.executeAndRaise(msg, self.shell.pullFunction, keys[0]) 447 elif len(keys) > 1: 448 pulledDeferreds = [] 449 for key in keys: 450 d = self.executeAndRaise(msg, self.shell.pullFunction, key) 451 pulledDeferreds.append(d) 452 # This will fire on the first failure and log the rest. 453 dTotal = gatherBoth(pulledDeferreds, 454 fireOnOneErrback=1, 455 logErrors=1, 456 consumeErrors=1) 457 return dTotal 458 else: 459 return self.executeAndRaise(msg, self.shell.pullFunction, None) 420 460 421 461 def getResult(self, i=None): … … 507 547 try: 508 548 unserialized = newserialized.IUnSerialized(v) 509 # The usage of globals() here is an attempt to bind any pickled functions 510 # to the globals of this module. What we really want is to have it bound 511 # to the globals of the callers module. This will require walking the 512 # stack. BG 10/3/07. 513 ns[k] = uncan(unserialized.getObject(),globals()) 549 ns[k] = unserialized.getObject() 514 550 except: 515 551 return defer.fail() … … 520 556 method: pullSerialized(*keys) 521 557 keys = %r""" % (self.id, keys) 522 if len(keys) > 1: 558 if len(keys)==1: 559 key = keys[0] 560 d = self.executeAndRaise(msg, self.shell.pull, key) 561 d.addCallback(newserialized.serialize) 562 return d 563 elif len(keys)>1: 523 564 pulledDeferreds = [] 524 565 for key in keys: 525 d = self.executeAndRaise(msg, self.shell.pull, key)566 d = self.executeAndRaise(msg, self.shell.pull, key) 526 567 pulledDeferreds.append(d) 527 568 # This will fire on the first failure and log the rest. … … 535 576 for v in values: 536 577 try: 537 serials = newserialized.serialize(v)578 serials.append(newserialized.serialize(v)) 538 579 except: 539 580 return defer.fail(failure.Failure()) 540 return dict(zip(keys, values))581 return dict(zip(keys, serials)) 541 582 return packThemUp 542 else: 543 key = keys[0] 544 d = self.executeAndRaise(msg, self.shell.pull, key) 545 d.addCallback(newserialized.serialize) 546 return d 547 548 583 584 549 585 def queue(methodToQueue): 550 586 def queuedMethod(this, *args, **kwargs): … … 685 721 pass 686 722 723 @queue 724 def pushFunction(self, **namespace): 725 pass 726 727 @queue 728 def pullFunction(self, *keys): 729 pass 730 687 731 def getResult(self, i=None): 688 732 if i is None: ipython1/trunk/ipython1/kernel/map.py
r2199 r3021 89 89 return genutil_flatten(listOfPartitions) 90 90 # If we have scalars, just return listOfPartitions 91 return listOfPartitions 91 return listOfPartitions 92 92 93 93 class RoundRobinMap(Map): ipython1/trunk/ipython1/kernel/multiengine.py
r2837 r3021 33 33 from zope.interface import Interface, implements, Attribute 34 34 35 from ipython1.kernel.util import gatherBoth36 from ipython1.kernel import map as Map35 from ipython1.kernel.util import printer 36 from ipython1.kernel.twistedutil import gatherBoth 37 37 from ipython1.kernel import error 38 38 from ipython1.kernel.controllerservice import \ … … 126 126 def pullAll(*keys): 127 127 """Pull from all targets.""" 128 129 def pushFunction(targets, **namespace): 130 """""" 131 132 def pushFunctionAll(**namespace): 133 """""" 134 135 def pullFunction(targets, *keys): 136 """""" 137 138 def pullFunctionAll(*keys): 139 """""" 128 140 129 141 def getResult(targets, i=None): … … 210 222 """get all the properties dicts.""" 211 223 212 213 class IEngineCoordinator(Interface): 214 """Methods that work on multiple engines explicitly.""" 215 216 #--------------------------------------------------------------------------- 217 # Coordinated methods 218 #--------------------------------------------------------------------------- 219 220 def scatter(targets, key, seq, style='basic', flatten=False): 221 """Partition and distribute a sequence to targets. 222 223 :Parameters: 224 key : str 225 The variable name to call the scattered sequence. 226 seq : list, tuple, array 227 The sequence to scatter. The type should be preserved. 228 style : string 229 A specification of how the sequence is partitioned. Currently 230 only 'basic' is implemented. 231 flatten : boolean 232 Should single element sequences be converted to scalars. 233 """ 234 235 def scatterAll(key, seq, style='basic', flatten=False): 236 """Scatter to all targets.""" 237 238 def gather(targets, key, style='basic'): 239 """Gather object key from targets. 240 241 :Parameters: 242 key : string 243 The name of a sequence on the targets to gather. 244 style : string 245 A specification of how the sequence is partitioned. Currently 246 only 'basic' is implemented. 247 """ 248 249 def gatherAll(key, style='basic'): 250 """Gather from all targets.""" 251 252 253 class IMultiEngine(IEngineMultiplexer, 254 IEngineCoordinator): 224 225 class IMultiEngine(IEngineMultiplexer): 255 226 """A controller that exposes an explicit interface to all of its engines. 256 227 … … 400 371 return self.pull('all', *keys) 401 372 373 def pushFunction(self, targets, **ns): 374 return self._performOnEnginesAndGatherBoth('pushFunction', targets, **ns) 375 376 def pushFunctionAll(self, **ns): 377 return self.pushFunction('all', **ns) 378 379 def pullFunction(self, targets, *keys): 380 return self._performOnEnginesAndGatherBoth('pullFunction', targets, *keys) 381 382 def pullFunctionAll(self, *keys): 383 return self.pullFunction('all', *keys) 384 402 385 def getResult(self, targets, i=None): 403 386 return self._performOnEnginesAndGatherBoth('getResult', targets, i) … … 569 552 return self.getProperties('all') 570 553 571 #---------------------------------------------------------------------------572 # IEngineCoordinator methods573 #---------------------------------------------------------------------------574 575 def scatter(self, targets, key, seq, style='basic', flatten=False):576 log.msg("Scattering %r to %r" % (key, targets))577 try:578 engines = self.engineList(targets)579 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):580 return defer.fail(failure.Failure())581 else:582 nEngines = len(engines)583 mapClass = Map.styles[style]584 mapObject = mapClass()585 dList = []586 for index, engine in enumerate(engines):587 partition = mapObject.getPartition(seq, index, nEngines)588 if flatten and len(partition) == 1:589 dList.append(engine.push(**{key: partition[0]}))590 else:591 dList.append(engine.push(**{key: partition}))592 return gatherBoth(dList,593 fireOnOneErrback=1,594 consumeErrors=1,595 logErrors=0)596 597 def scatterAll(self, key, seq, style='basic', flatten=False):598 return self.scatter('all', key, seq, style, flatten)599 600 def gather(self, targets, key, style='basic'):601 """gather a distributed object, and reassemble it"""602 log.msg("Gathering %s from %r" % (key, targets))603 try:604 engines = self.engineList(targets)605 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):606 return defer.fail(failure.Failure())607 else:608 nEngines = len(engines)609 dList = []610 for e in engines:611 dList.append(e.pull(key))612 mapClass = Map.styles[style]613 mapObject = mapClass()614 d = gatherBoth(dList,615 fireOnOneErrback=1,616 consumeErrors=1,617 logErrors=0)618 return d.addCallback(mapObject.joinPartitions)619 620 def gatherAll(self, key, style='basic'):621 return self.gather('all', key, style)622 623 554 624 555 components.registerAdapter(MultiEngine, … … 674 605 @twoPhase 675 606 def pull(self, targets, *keys): 676 return self.multiengine.pull(targets, *keys) 677 607 d = self.multiengine.pull(targets, *keys) 608 return d 609 610 @twoPhase 611 def pushFunction(self, targets, **namespace): 612 return self.multiengine.pushFunction(targets, **namespace) 613 614 @twoPhase 615 def pullFunction(self, targets, *keys): 616 d = self.multiengine.pullFunction(targets, *keys) 617 return d 618 678 619 @twoPhase 679 620 def getResult(self, targets, i=None): … … 728 669 return self.multiengine.clearProperties(targets, *keys) 729 670 730 @twoPhase731 def scatter(self, targets, key, seq, style='basic', flatten=False):732 return self.multiengine.scatter(targets, key, seq, style, flatten)733 734 @twoPhase735 def gather(self, targets, key, style='basic'):736 return self.multiengine.gather(targets, key, style)737 738 671 #--------------------------------------------------------------------------- 739 672 # IMultiEngine methods ipython1/trunk/ipython1/kernel/multiengineclient.py
r2839 r3021 30 30 from ipython1.kernel import error 31 31 from ipython1.kernel.parallelfunction import ParallelFunction 32 from ipython1.kernel import map as Map 32 33 33 34 … … 422 423 if self.block: 423 424 if isinstance(func, FunctionType): 424 self.push (targets, _ipython_map_func=func)425 self.pushFunction(targets, _ipython_map_func=func) 425 426 sourceToRun = '_ipython_map_seq_result = map(_ipython_map_func, _ipython_map_seq)' 426 427 elif isinstance(func, str): … … 435 436 else: 436 437 if isinstance(func, FunctionType): 437 pd1 = self.push (targets, _ipython_map_func=func)438 pd1 = self.pushFunction(targets, _ipython_map_func=func) 438 439 sourceToRun = '_ipython_map_seq_result = map(_ipython_map_func, _ipython_map_seq)' 439 440 elif isinstance(func, str): … … 489 490 """ 490 491 return self.parallelize('all', functionName) 492 493 494 #------------------------------------------------------------------------------- 495 # MultiEngineCoordinator 496 #------------------------------------------------------------------------------- 497 498 class MultiEngineCoordinator(object): 499 500 def scatter(self, targets, key, seq, style='basic', flatten=False): 501 """Partition and distribute a sequence to a set of targets/engines. 502 503 This method partitions a Python sequence and then pushes the partitions 504 to a set of engines. 505 506 :Parameters: 507 targets : int, list or 'all' 508 The engine ids the action will apply to. Call `getIDs` to see 509 a list of currently available engines. 510 key : str 511 What to call the partitions on the engines. 512 seq : list, tuple or numpy array 513 The sequence to be partitioned and pushed. 514 style : str 515 The style of partitioning to use. Only 'basic' is supported 516 flatten : boolean 517 Should length 1 partitions be flattened to scalars upon pushing. 518 """ 519 self._checkClientID() 520 localBlock = self._reallyBlock() 521 if targets=='all': 522 engines = self.getIDs() 523 else: 524 engines = targets 525 nEngines = len(engines) 526 mapClass = Map.styles[style] 527 mapObject = mapClass() 528 results = [] 529 for index, engine in enumerate(engines): 530 partition = mapObject.getPartition(seq, index, nEngines) 531 if flatten and len(partition) == 1: 532 results.append(self.push(engine, **{key: partition[0]})) 533 else: 534 results.append(self.push(engine, **{key: partition})) 535 if not localBlock: 536 self.barrier(*results) 537 return nEngines*[None] 538 539 def scatterAll(self, key, seq, style='basic', flatten=False): 540 """Partition and distribute a sequence to all targets/engines. 541 542 See the docstring for `scatter` for full details. 543 """ 544 return self.scatter('all', key, seq, style, flatten) 545 546 scatter_all = scatterAll 547 548 def gather(self, targets, key, style='basic'): 549 """Gather a set of sequence partitions that are distributed on targets. 550 551 This method is the inverse of `scatter`
