Changeset 3041
- Timestamp:
- 02/12/08 15:57:37 (10 months ago)
- Files:
-
- ipython1/branches/ipython1-client-r3021/ipython1/kernel/multiengine.py (modified) (5 diffs)
- ipython1/branches/ipython1-client-r3021/ipython1/kernel/multiengineclient.py (modified) (8 diffs)
- ipython1/branches/ipython1-client-r3021/ipython1/kernel/tests/multienginetest.py (modified) (4 diffs)
- ipython1/branches/ipython1-client-r3021/ipython1/kernel/tests/test_multiengine.py (modified) (3 diffs)
- ipython1/branches/ipython1-client-r3021/ipython1/kernel/tests/test_multienginexmlrpc.py (modified) (3 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
ipython1/branches/ipython1-client-r3021/ipython1/kernel/multiengine.py
r3037 r3041 787 787 788 788 789 class MultiEngineCoordinator(object): 789 #------------------------------------------------------------------------------- 790 # ISynchronousMultiEngineCoordinator/ITwoPhaseMultiEngineCoordinator 791 #------------------------------------------------------------------------------- 792 793 class ISynchronousMultiEngineCoordinator(IMultiEngineCoordinator): 794 """Methods that work on multiple engines explicitly.""" 795 pass 796 797 798 class ITwoPhaseMultiEngineCoordinator(IMultiEngineCoordinator): 799 """Methods that work on multiple engines explicitly.""" 800 pass 801 802 803 class TwoPhaseMultiEngineCoordinator(object): 790 804 """Mix in class for scater/gather. 791 805 792 This can be mixed in with any I MultiEngine implementer.806 This can be mixed in with any ITwoPhaseMultiEngine implementer. 793 807 """ 794 808 795 implements(I MultiEngineCoordinator)809 implements(ITwoPhaseMultiEngineCoordinator) 796 810 797 811 def _process_targets(self, targets): … … 808 822 raise error.InvalidEngineID("engine with id %r does not exist"%t) 809 823 return engines 810 811 d = self.getIDs() 812 d.addCallback(create_targets) 813 return d 814 815 def scatter(self, targets, key, seq, style='basic', flatten=False): 816 log.msg("Scattering %r to %r" % (key, targets)) 817 818 def do_scatter(engines): 819 nEngines = len(engines) 820 mapClass = Map.styles[style] 821 mapObject = mapClass() 822 dList = [] 823 for index, engineid in enumerate(engines): 824 partition = mapObject.getPartition(seq, index, nEngines) 825 if flatten and len(partition) == 1: 826 dList.append(self.push(engineid, **{key: partition[0]})) 827 else: 828 dList.append(self.push(engineid, **{key: partition})) 829 return gatherBoth(dList, 830 fireOnOneErrback=1, 831 consumeErrors=1, 832 logErrors=0) 833 834 d = self._process_targets(targets) 835 d.addCallback(do_scatter) 836 return d 837 838 def scatterAll(self, key, seq, style='basic', flatten=False): 839 return self.scatter('all', key, seq, style, flatten) 840 841 def gather(self, targets, key, style='basic'): 842 """gather a distributed object, and reassemble it""" 843 log.msg("Gathering %s from %r" % (key, targets)) 844 845 def do_gather(engines): 846 nEngines = len(engines) 847 dList = [] 848 for engineid in engines: 849 dList.append(self.pull(engineid, key)) 850 mapClass = Map.styles[style] 851 mapObject = mapClass() 852 d = gatherBoth(dList, 853 fireOnOneErrback=1, 854 consumeErrors=1, 855 logErrors=0) 856 d.addCallback(lambda lop: [i[0] for i in lop]) 857 return d.addCallback(mapObject.joinPartitions) 858 d = self._process_targets(targets) 859 d.addCallback(do_gather) 860 return d 861 862 def gatherAll(self, key, style='basic'): 863 return self.gather('all', key, style) 864 865 def map(self, targets, func, seq, style='basic'): 866 867 if isinstance(func, FunctionType): 868 d = self.pushFunction(targets, _ipython_map_func=func) 869 sourceToRun = '_ipython_map_seq_result = map(_ipython_map_func, _ipython_map_seq)' 870 elif isinstance(func, str): 871 d = defer.succeed(None) 872 sourceToRun = \ 873 '_ipython_map_seq_result = map(%s, _ipython_map_seq)' % \ 874 func 875 else: 876 raise TypeError("func must be a function or str") 877 878 d.addCallback(lambda _: self.scatter(targets, '_ipython_map_seq', seq, style)) 879 d.addCallback(lambda _: self.execute(targets, sourceToRun)) 880 d.addCallback(lambda _: self.gather(targets, '_ipython_map_seq_result', style)) 881 return d 882 883 def mapAll(self, func, seq, style='basic'): 884 """Parallel map on all engines. 885 886 See the docstring for `map` for more details. 887 """ 888 return self.map('all', func, seq, style) 889 890 891 892 #------------------------------------------------------------------------------- 893 # ISynchronousMultiEngineCoordinator/ITwoPhaseMultiEngineCoordinator 894 #------------------------------------------------------------------------------- 895 896 class ISynchronousMultiEngineCoordinator(IMultiEngineCoordinator): 897 """Methods that work on multiple engines explicitly.""" 898 pass 899 900 901 class ITwoPhaseMultiEngineCoordinator(IMultiEngineCoordinator): 902 """Methods that work on multiple engines explicitly.""" 903 pass 904 905 906 class TwoPhaseMultiEngineCoordinator(object): 907 """Mix in class for scater/gather. 908 909 This can be mixed in with any ITwoPhaseMultiEngine implementer. 910 """ 911 912 implements(ITwoPhaseMultiEngineCoordinator) 913 914 def _process_targets(self, targets): 915 916 def create_targets(ids): 917 if isinstance(targets, int): 918 engines = [targets] 919 elif targets=='all': 920 engines = ids 921 elif isinstance(targets, (list, tuple)): 922 engines = targets 923 for t in engines: 924 if not t in ids: 925 raise error.InvalidEngineID("engine with id %r does not exist"%t) 926 return engines 927 d = self.smultiengine.getIDs(True) 824 d = self.smultiengine.getIDs() 928 825 d.addCallback(create_targets) 929 826 return d … … 993 890 raise TypeError("func must be a function or str") 994 891 995 d.addCallback(self.scatter('_ipython_map_seq', seq, style, targets=targets)) 892 d.addCallback(lambda _: self.scatter('_ipython_map_seq', seq, style, targets=targets)) 893 d.addCallback(lambda _: self.smultiengine.execute(sourceToRun, targets=targets, block=False)) 996 894 d.addCallback(lambda did: self.smultiengine.getPendingDeferred(did, True)) 997 d.addCallback(self.smultiengine.execute(sourceToRun, targets=targets, block=False)) 998 d.addCallback(self.gather('_ipython_map_seq_result', style, targets=targets)) 999 d.addCallback(lambda did: self.smultiengine.getPendingDeferred(did, True)) 895 d.addCallback(lambda _: self.gather('_ipython_map_seq_result', style, targets=targets)) 1000 896 return d 1001 897 … … 1025 921 :Returns: A list of pulled Python objects for each target. 1026 922 """ 1027 1028 def zipPullAll(*keys):1029 """"""1030 923 1031 924 def run(targets, fname): … … 1046 939 will be used instead. 1047 940 """ 1048 1049 def runAll(fname):1050 """Run a .py file on all engines.1051 1052 See the docstring for `run` for more details.1053 """1054 1055 1056 class MultiEngineExtras(object):1057 1058 implements(IMultiEngineExtras)1059 1060 def _transformPullResult(self, pushResult, multitargets, lenKeys):1061 if not multitargets:1062 result = pushResult[0]1063 elif lenKeys > 1:1064 result = zip(*pushResult)1065 elif lenKeys is 1:1066 result = list(pushResult)1067 return result1068 1069 def zipPull(self, keys, targets='all'):1070 d = self.pull(keys, targets=targets)1071 multitargets = not isinstance(targets, int) and len(targets) > 11072 lenKeys = len(keys)1073 d.addCallback(self._transformPullResult, multitargets, lenKeys)1074 return d1075 1076 def run(self, fname, targets='all'):1077 fileobj = open(fname,'r')1078 source = fileobj.read()1079 fileobj.close()1080 # if the compilation blows, we get a local error right away1081 try:1082 code = compile(source,fname,'exec')1083 except:1084 return defer.fail(failure.Failure())1085 # Now run the code1086 return self.execute(source, targets)1087 941 1088 942 ipython1/branches/ipython1-client-r3021/ipython1/kernel/multiengineclient.py
r3037 r3041 52 52 53 53 resultID=Attribute("ID of the deferred on the other side") 54 client=Attribute("A n ISynchronousMultiEngineClient that I came from")54 client=Attribute("A client that I came from") 55 55 r=Attribute("An attribute that is a property that calls and returns getResult") 56 56 … … 79 79 catch these by hand when calling `getResult`. 80 80 """ 81 81 82 82 83 class PendingResult(object): … … 170 171 def _get_r(self): 171 172 return self.getResult(block=True) 172 173 173 174 r = property(_get_r) 174 175 """This property is a shortcut to a `getResult(block=True)`.""" 175 176 176 177 177 178 #------------------------------------------------------------------------------- 178 179 # Pretty printing wrappers for certain lists … … 210 211 red, cmd_num, normal, cmd_stderr)) 211 212 return ''.join(output) 212 213 214 213 215 def wrapResultList(result): 214 216 """A function that wraps the output of `execute`/`getResult` -> `ResultList`.""" … … 216 218 result = [result] 217 219 return ResultList(result) 218 220 221 219 222 class QueueStatusList(list): 220 223 """A subclass of list that pretty prints the output of `queueStatus`.""" 221 224 222 225 def __repr__(self): 223 226 output = [] … … 320 323 functionName : str 321 324 A Python string that names a callable defined on the engines. 322 325 323 326 :Returns: A `ParallelFunction` object. 324 327 325 328 Examples 326 329 ======== … … 332 335 targets, block = self._findTargetsAndBlock(targets, block) 333 336 return ParallelFunction(func, self, targets, block) 334 335 337 336 338 … … 361 363 else: 362 364 raise ValueError("block must be True or False") 363 365 364 366 def _findTargets(self, targets=None): 365 367 if targets is None: ipython1/branches/ipython1-client-r3021/ipython1/kernel/tests/multienginetest.py
r3037 r3041 31 31 from ipython1.core.interpreter import Interpreter 32 32 33 34 #------------------------------------------------------------------------------- 35 # Base classes and utilities 36 #------------------------------------------------------------------------------- 37 33 38 class IMultiEngineBaseTestCase(object): 34 39 """Basic utilities for working with multiengine tests. … … 71 76 72 77 78 #------------------------------------------------------------------------------- 79 # IMultiEngineTestCase 80 #------------------------------------------------------------------------------- 73 81 74 82 class IMultiEngineTestCase(IMultiEngineBaseTestCase): … … 353 361 354 362 363 #------------------------------------------------------------------------------- 364 # ISynchronousMultiEngineTestCase 365 #------------------------------------------------------------------------------- 366 355 367 class ISynchronousMultiEngineTestCase(IMultiEngineBaseTestCase): 356 368 … … 550 562 551 563 552 553 class ITwoPhaseMultiEngineTestCase(IMultiEngineBaseTestCase): 564 class ITwoPhaseMultiEngineTestCase(IMultiEngineTestCase): 565 """From an interface perspective, this is just an IMultiEngine.""" 554 566 pass 555 567 556 568 557 class IMultiEngineCoordinator(IMultiEngineBaseTestCase): 569 #------------------------------------------------------------------------------- 570 # Coordinator test cases 571 #------------------------------------------------------------------------------- 572 573 class IMultiEngineCoordinatorTestCase(object): 574 575 def testScatterGather(self): 576 self.addEngine(4) 577 d = self.multiengine.scatter('a', range(16)) 578 d.addCallback(lambda r: self.multiengine.gather('a')) 579 d.addCallback(lambda r: self.assertEquals(r, range(16))) 580 return d 581 582 def testScatterGatherNumpy(self): 583 try: 584 import numpy 585 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal 586 except: 587 return 588 else: 589 self.addEngine(4) 590 a = numpy.arange(16) 591 d = self.multiengine.scatter('a', a) 592 d.addCallback(lambda r: self.multiengine.gather('a')) 593 d.addCallback(lambda r: assert_array_equal(r, a)) 594 return d 595 596 def testMap(self): 597 self.addEngine(4) 598 def f(x): 599 return x**2 600 data = range(16) 601 d = self.multiengine.map(f, data) 602 d.addCallback(lambda r: self.assertEquals(r,[f(x) for x in data])) 603 return d 604 605 606 class ITwoPhaseMultiEngineCoordinatorTestCase(IMultiEngineCoordinatorTestCase): 558 607 pass 559 560 561 class ITwoPhaseMultiEngineCoordinator(IMultiEngineBaseTestCase): 608 609 610 611 class ISynchronousMultiEngineCoordinatorTestCase(IMultiEngineCoordinatorTestCase): 612 613 def testScatterGatherNonblocking(self): 614 self.addEngine(4) 615 d = self.multiengine.scatter('a', range(16), block=False) 616 d.addCallback(lambda did: self.multiengine.getPendingDeferred(did, True)) 617 d.addCallback(lambda r: self.multiengine.gather('a', block=False)) 618 d.addCallback(lambda did: self.multiengine.getPendingDeferred(did, True)) 619 d.addCallback(lambda r: self.assertEquals(r, range(16))) 620 return d 621 622 def testScatterGatherNumpyNonblocking(self): 623 try: 624 import numpy 625 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal 626 except: 627 return 628 else: 629 self.addEngine(4) 630 a = numpy.arange(16) 631 d = self.multiengine.scatter('a', a, block=False) 632 d.addCallback(lambda did: self.multiengine.getPendingDeferred(did, True)) 633 d.addCallback(lambda r: self.multiengine.gather('a', block=False)) 634 d.addCallback(lambda did: self.multiengine.getPendingDeferred(did, True)) 635 d.addCallback(lambda r: assert_array_equal(r, a)) 636 return d 637 638 def testMapNonblocking(self): 639 self.addEngine(4) 640 def f(x): 641 return x**2 642 data = range(16) 643 d = self.multiengine.map(f, data, block=False) 644 d.addCallback(lambda did: self.multiengine.getPendingDeferred(did, True)) 645 d.addCallback(lambda r: self.assertEquals(r,[f(x) for x in data])) 646 return d 647 648 649 #------------------------------------------------------------------------------- 650 # Extras test cases 651 #------------------------------------------------------------------------------- 652 653 class IMultiEngineExtrasTestCase(object): 654 655 def testZipPull(self): 656 self.addEngine(4) 657 d = self.multiengine.push(dict(a=10,b=20)) 658 d.addCallback(lambda r: self.multiengine.zipPull(('a','b'))) 659 d.addCallback(lambda r: self.assert_(r, [4*[10],4*[20]])) 660 return d 661 662 def testRun(self): 663 self.addEngine(4) 664 import tempfile 665 fname = tempfile.mktemp('foo.py') 666 f = open(fname, 'w') 667 f.write('a = 10\nb=30') 668 f.close() 669 d = self.multiengine.run(fname) 670 d.addCallback(lambda r: self.multiengine.pull(('a','b'))) 671 d.addCallback(lambda r: self.assertEquals(r, 4*[[10,30]])) 672 return d 673 674 675 class ITwoPhaseMultiEngineExtras(IMultiEngineExtrasTestCase): 562 676 pass 563 677 564 678 565 class IMultiEngineExtras(IMultiEngineBaseTestCase): 679 class ISynchronousMultiEngineExtrasTestCase(IMultiEngineExtrasTestCase): 680 681 def testZipPullNonblocking(self): 682 self.addEngine(4) 683 d = self.multiengine.push(dict(a=10,b=20)) 684 d.addCallback(lambda r: self.multiengine.zipPull(('a','b'), block=False)) 685 d.addCallback(lambda did: self.multiengine.getPendingDeferred(did, True)) 686 d.addCallback(lambda r: self.assert_(r, [4*[10],4*[20]])) 687 return d 688 689 def testRunNonblocking(self): 690 self.addEngine(4) 691 import tempfile 692 fname = tempfile.mktemp('foo.py') 693 f = open(fname, 'w') 694 f.write('a = 10\nb=30') 695 f.close() 696 d = self.multiengine.run(fname, block=False) 697 d.addCallback(lambda did: self.multiengine.getPendingDeferred(did, True)) 698 d.addCallback(lambda r: self.multiengine.pull(('a','b'))) 699 d.addCallback(lambda r: self.assertEquals(r, 4*[[10,30]])) 700 return d 701 702 703 #------------------------------------------------------------------------------- 704 # Full interfaces 705 #------------------------------------------------------------------------------- 706 707 class IFullTwoPhaseMultiEngineTestCase(IMultiEngineTestCase, 708 ITwoPhaseMultiEngineCoordinatorTestCase, 709 ITwoPhaseMultiEngineExtras): 566 710 pass 567 711 568 712 569 class ITwoPhaseMultiEngineExtras(IMultiEngineBaseTestCase): 713 class IFullSynchronousTwoPhaseMultiEngineTestCase(ISynchronousMultiEngineTestCase, 714 ISynchronousMultiEngineCoordinatorTestCase, 715 ISynchronousMultiEngineExtrasTestCase): 570 716 pass 571 717 572 718 573 class IFullTwoPhaseMultiEngine(IMultiEngineBaseTestCase): 574 pass 575 576 577 class IFullSynchronousTwoPhaseMultiEngine(IMultiEngineBaseTestCase): 578 pass 579 580 581 582 583 719 720 721 ipython1/branches/ipython1-client-r3021/ipython1/kernel/tests/test_multiengine.py
r3028 r3041 20 20 from ipython1.kernel.controllerservice import ControllerService 21 21 from ipython1.kernel import multiengine as me 22 from ipython1.kernel.tests.multienginetest import \ 23 IMultiEngineTestCase, \ 24 ISynchronousMultiEngineTestCase 22 from ipython1.kernel.tests.multienginetest import (IMultiEngineTestCase, 23 ISynchronousMultiEngineTestCase, 24 IFullTwoPhaseMultiEngineTestCase, 25 IFullSynchronousTwoPhaseMultiEngineTestCase) 25 26 26 27 27 class BasicMultiEngineTestCase(DeferredTestCase, 28 IMultiEngineTestCase): 28 class BasicMultiEngineTestCase(DeferredTestCase, IMultiEngineTestCase): 29 29 30 30 def setUp(self): … … 39 39 e.stopService() 40 40 41 class SynchronousMultiEngineTestCase(DeferredTestCase, 42 ISynchronousMultiEngineTestCase):41 42 class SynchronousMultiEngineTestCase(DeferredTestCase, ISynchronousMultiEngineTestCase): 43 43 44 44 def setUp(self): … … 52 52 for e in self.engines: 53 53 e.stopService() 54 55 56 class FullTwoPhaseMultiEngineTestCase(DeferredTestCase, IFullTwoPhaseMultiEngineTestCase): 57 58 def setUp(self): 59 self.controller = ControllerService() 60 self.controller.startService() 61 self.multiengine = me.IFullTwoPhaseMultiEngine(me.ISynchronousMultiEngine(me.IMultiEngine(self.controller))) 62 self.engines = [] 63 64 def tearDown(self): 65 self.controller.stopService() 66 for e in self.engines: 67 e.stopService() 68 69 70 class FullSynchronousTwoPhaseMultiEngineTestCase(DeferredTestCase, IFullSynchronousTwoPhaseMultiEngineTestCase): 71 72 def setUp(self): 73 self.controller = ControllerService() 74 self.controller.startService() 75 multiengine = me.IMultiEngine(self.controller) 76 multiengine = me.ISynchronousMultiEngine(multiengine) 77 multiengine = me.IFullTwoPhaseMultiEngine(multiengine) 78 self.multiengine = me.IFullSynchronousTwoPhaseMultiEngine(multiengine) 79 self.engines = [] 80 81 def tearDown(self): 82 self.controller.stopService() 83 for e in self.engines: 84 e.stopService() ipython1/branches/ipython1-client-r3021/ipython1/kernel/tests/test_multienginexmlrpc.py
r3036 r3041 11 11 from ipython1.kernel.controllerservice import ControllerService 12 12 from ipython1.kernel.multiengine import IMultiEngine 13 from ipython1.kernel.tests.multienginetest import (IMultiEngineTestCase, 14 ISynchronousMultiEngineTestCase) 13 from ipython1.kernel.tests.multienginetest import (ISynchronousMultiEngineTestCase, 14 IFullTwoPhaseMultiEngineTestCase, 15 IFullSynchronousTwoPhaseMultiEngineTestCase) 15 16 16 17 from ipython1.kernel.multienginexmlrpc import IXMLRPCMultiEngineFactory … … 18 19 from ipython1.kernel import multiengine as me 19 20 20 class BasicMultiEngineTestCase(DeferredTestCase, 21 IMultiEngineTestCase): 22 23 def setUp(self): 24 self.servers = [] 25 self.clients = [] 26 self.services = [] 27 28 self.controller = ControllerService() 29 self.controller.startService() 30 self.imultiengine = IMultiEngine(self.controller) 31 self.imultiengine_factory = IXMLRPCMultiEngineFactory(self.imultiengine) 32 self.servers.append(reactor.listenTCP(10105, self.imultiengine_factory)) 33 self.multiengine = me.IFullTwoPhaseMultiEngine(XMLRPCSynchronousMultiEngineClient(('localhost',10105))) 34 self.engines = [] 35 36 def tearDown(self): 37 l = [] 38 for s in self.servers: 39 try: 40 d = s.stopListening() 41 if d is not None: 42 l.append(d) 43 except: 44 pass 45 for c in self.clients: 46 c.disconnect() 47 del c 48 dl = defer.DeferredList(l) 49 self.controller.stopService() 50 for e in self.engines: 51 e.stopService() 52 return dl 53 54 class SynchronousMultiEngineTestCase(DeferredTestCase, 55 ISynchronousMultiEngineTestCase): 21 class SynchronousMultiEngineTestCase(DeferredTestCase, ISynchronousMultiEngineTestCase): 56 22 57 23 def setUp(self): … … 85 51 e.stopService() 86 52 return dl 53 54 55 class FullTwoPhaseMultiEngineTestCase(DeferredTestCase,IFullTwoPhaseMultiEngineTestCase): 56 57 def setUp(self): 58 self.servers = [] 59 self.clients = [] 60 self.services = [] 61 62 self.controller = ControllerService() 63 self.controller.startService() 64 self.imultiengine = IMultiEngine(self.controller) 65 self.imultiengine_factory = IXMLRPCMultiEngineFactory(self.imultiengine) 66 self.servers.append(reactor.listenTCP(10105, self.imultiengine_factory)) 67 multiengine = XMLRPCSynchronousMultiEngineClient(('localhost',10105)) 68 self.multiengine = me.IFullTwoPhaseMultiEngine(multiengine) 69 self.engines = [] 70 71 def tearDown(self): 72 l = [] 73 for s in self.servers: 74 try: 75 d = s.stopListening() 76 if d is not None: 77 l.append(d) 78 except: 79 pass 80 for c in self.clients: 81 c.disconnect() 82 del c 83 dl = defer.DeferredList(l) 84 self.controller.stopService() 85 for e in self.engines: 86 e.stopService() 87 return dl 88 89 90 class FullSynchronousTwoPhaseMultiEngineTestCase(DeferredTestCase,IFullSynchronousTwoPhaseMultiEngineTestCase): 91 92 def setUp(self): 93 self.servers = [] 94 self.clients = [] 95 self.services = [] 96 97 self.controller = ControllerService() 98 self.controller.startService() 99 self.imultiengine = IMultiEngine(self.controller) 100 self.imultiengine_factory = IXMLRPCMultiEngineFactory(self.imultiengine) 101 self.servers.append(reactor.listenTCP(10105, self.imultiengine_factory)) 102 multiengine = XMLRPCSynchronousMultiEngineClient(('localhost',10105)) 103 multiengine = me.IFullTwoPhaseMultiEngine(multiengine) 104 self.multiengine = me.IFullSynchronousTwoPhaseMultiEngine(multiengine) 105 self.engines = [] 106 107 def tearDown(self): 108 l = [] 109 for s in self.servers: 110 try: 111 d = s.stopListening() 112 if d is not None: 113 l.append(d) 114 except: 115 pass 116 for c in self.clients: 117 c.disconnect() 118 del c 119 dl = defer.DeferredList(l) 120 self.controller.stopService() 121 for e in self.engines: 122 e.stopService() 123 return dl
