Changeset 3043
- Timestamp:
- 02/12/08 17:43:35 (10 months ago)
- Files:
-
- ipython1/branches/ipython1-client-r3021/docs/examples/DistributedHelloWorld.py (modified) (1 diff)
- ipython1/branches/ipython1-client-r3021/docs/examples/fetchparse.py (modified) (4 diffs)
- ipython1/branches/ipython1-client-r3021/docs/examples/mcdriver.py (modified) (2 diffs)
- ipython1/branches/ipython1-client-r3021/docs/examples/nwmerge.py (modified) (3 diffs)
- ipython1/branches/ipython1-client-r3021/docs/examples/parallel_pylab.ipy (modified) (2 diffs)
- ipython1/branches/ipython1-client-r3021/docs/examples/phistogram.py (modified) (1 diff)
- ipython1/branches/ipython1-client-r3021/docs/examples/plotting_frontend.py (modified) (1 diff)
- ipython1/branches/ipython1-client-r3021/docs/examples/pwordfreq.py (modified) (3 diffs)
- ipython1/branches/ipython1-client-r3021/docs/examples/rmt.ipy (modified) (2 diffs)
- ipython1/branches/ipython1-client-r3021/docs/examples/simpleiter.py (deleted)
- ipython1/branches/ipython1-client-r3021/docs/examples/task1.py (modified) (1 diff)
- ipython1/branches/ipython1-client-r3021/docs/examples/task_profiler.py (modified) (2 diffs)
- ipython1/branches/ipython1-client-r3021/ipython1/kernel/client.py (modified) (1 diff)
- ipython1/branches/ipython1-client-r3021/ipython1/kernel/config/__init__.py (modified) (1 diff)
- ipython1/branches/ipython1-client-r3021/ipython1/kernel/multiengine.py (modified) (1 diff)
- ipython1/branches/ipython1-client-r3021/ipython1/kernel/multiengineclient.py (modified) (3 diffs)
- ipython1/branches/ipython1-client-r3021/ipython1/kernel/multienginexmlrpc.py (modified) (2 diffs)
- ipython1/branches/ipython1-client-r3021/ipython1/kernel/scripts/ipcluster.py (modified) (1 diff)
- ipython1/branches/ipython1-client-r3021/ipython1/kernel/scripts/ipcluster1.py (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
ipython1/branches/ipython1-client-r3021/docs/examples/DistributedHelloWorld.py
r2472 r3043 3 3 Ken Kinder <ken@kenkinder.com> 4 4 """ 5 import ipython1.kernel.api as kernel 6 import ipython1.kernel.multienginexmlrpc 7 import ipython1.kernel.taskxmlrpc 5 from ipython1.kernel import client 8 6 9 rc = kernel.TaskController(('127.0.0.1', 10113)) 10 ipc = kernel.RemoteController(('127.0.0.1', 10105)) 11 assert isinstance(rc, ipython1.kernel.taskxmlrpc.XMLRPCInteractiveTaskClient) 12 assert isinstance(ipc, ipython1.kernel.multienginexmlrpc.XMLRPCInteractiveMultiEngineClient) 7 tc = client.TaskController(('127.0.0.1', 10113)) 8 mec = client.MultiEngineController(('127.0.0.1', 10105)) 13 9 14 ipc.execute('all','import time')15 helloTaskId = rc.run(kernel.Task('time.sleep(3) ; word = "Hello,"', resultNames=['word']))16 worldTaskId = rc.run(kernel.Task('time.sleep(3) ; word = "World!"', resultNames=['word']))10 mec.execute('import time') 11 helloTaskId = tc.run(client.Task('time.sleep(3) ; word = "Hello,"', resultNames=['word'])) 12 worldTaskId = tc.run(client.Task('time.sleep(3) ; word = "World!"', resultNames=['word'])) 17 13 18 print rc.getTaskResult(helloTaskId).ns.word, rc.getTaskResult(worldTaskId).ns.word14 print tc.getTaskResult(helloTaskId).ns.word, tc.getTaskResult(worldTaskId).ns.word ipython1/branches/ipython1-client-r3021/docs/examples/fetchparse.py
r2451 r3043 10 10 """ 11 11 from twisted.python.failure import Failure 12 import ipython1.kernel.api as kernel 12 from ipython1.kernel import client 13 13 import time 14 14 … … 39 39 40 40 def __init__(self, site): 41 self.tc = kernel.TaskController(('127.0.0.1', 10113))42 self.rc = kernel.RemoteController(('127.0.0.1', 10105))43 self.rc.execute( 'all',fetchParse)41 self.tc = client.TaskController(('127.0.0.1', 10113)) 42 self.rc = client.MultiEngineController(('127.0.0.1', 10105)) 43 self.rc.execute(fetchParse) 44 44 45 45 self.allLinks = [] … … 54 54 if url.startswith(self.site): 55 55 print ' ', url 56 self.linksWorking[url] = self.tc.run( kernel.Task('links = fetchAndParse(url)', resultNames=['links'], setupNS={'url': url}))56 self.linksWorking[url] = self.tc.run(client.Task('links = fetchAndParse(url)', resultNames=['links'], setupNS={'url': url})) 57 57 58 58 def onVisitDone(self, result, url): … … 65 65 print ' ', line 66 66 else: 67 for link in result. links:67 for link in result.ns.links: 68 68 self.visitLink(link) 69 69 ipython1/branches/ipython1-client-r3021/docs/examples/mcdriver.py
r2486 r3043 3 3 """Run a Monte-Carlo options pricer in parallel.""" 4 4 5 import ipython1.kernel.api as kernel 5 from ipython1.kernel import client 6 6 import numpy as N 7 7 from mcpricer import MCOptionPricer 8 8 9 9 10 tc = kernel.TaskController(('127.0.0.1', 10113))11 rc = kernel.RemoteController(('127.0.0.1', 10105))10 tc = client.TaskController(('127.0.0.1', 10113)) 11 rc = client.MultiEngineController(('127.0.0.1', 10105)) 12 12 13 13 # Initialize the common code on the engines 14 rc.run All('mcpricer.py')14 rc.run('mcpricer.py') 15 15 16 16 # Push the variables that won't change (stock print, interest rate, days and MC paths) 17 rc.push All(S=100.0, r=0.05, days=260, paths=10000)17 rc.push(dict(S=100.0, r=0.05, days=260, paths=10000)) 18 18 19 19 task_string = """\ … … 31 31 for K in K_vals: 32 32 for sigma in sigma_vals: 33 t = kernel.Task(task_string,33 t = client.Task(task_string, 34 34 setupNS=dict(sigma=sigma,K=K), 35 35 resultNames=['vp','ap','vc','ac','sigma','K']) ipython1/branches/ipython1-client-r3021/docs/examples/nwmerge.py
r2616 r3043 76 76 # Check that the object exists on the engine and pin a reference to it 77 77 iter_name = '_%s_rmt_iter_' % name 78 rc.execute( engine,'%s = iter(%s)' % (iter_name,name))78 rc.execute('%s = iter(%s)' % (iter_name,name), targets=engine) 79 79 tpl = '_tmp = %s.next()' % iter_name 80 80 while True: 81 rc.execute( engine,tpl)82 yield rc.pull( engine,'_tmp')[0]81 rc.execute(tpl, targets=engine) 82 yield rc.pull('_tmp', targets=engine)[0] 83 83 84 84 … … 86 86 if __name__ == '__main__': 87 87 88 import ipython1.kernel.api as kernel89 ipc = kernel.RemoteController(('127.0.0.1',10105))88 from ipython1.kernel import client 89 ipc = client.RemoteController(('127.0.0.1',10105)) 90 90 print 'Engine IDs:',ipc.getIDs() 91 91 … … 98 98 # computation. In this simple example, we just send them over into the 99 99 # remote engines. They will all be called 'a' in each engine. 100 ipc.push( 0,a=a0)101 ipc.push( 1,a=a1)102 ipc.push( 2,a=a2)100 ipc.push(dict(a=a0), targets=0) 101 ipc.push(dict(a=a1), targets=1) 102 ipc.push(dict(a=a2), targets=2) 103 103 104 104 # And we now make a local object which represents the remote iterator ipython1/branches/ipython1-client-r3021/docs/examples/parallel_pylab.ipy
r2486 r3043 21 21 import numpy as N 22 22 from pylab import * 23 import ipython1.kernel.api as kernel 23 from ipython1.kernel import client 24 24 25 25 # Get an IPython1 client 26 rc = kernel.RemoteController(('127.0.0.1',10105))26 rc = client.MultiEngineController(('127.0.0.1',10105)) 27 27 rc.getIDs() 28 28 rc.activate() … … 40 40 41 41 # Bring back the data 42 x_local = rc.gather All('x')43 y_local = rc.gather All('y')42 x_local = rc.gather('x') 43 y_local = rc.gather('y') 44 44 45 45 # Make a scatter plot of the gathered data 46 scatter(x_local, y_local)46 plot(x_local, y_local,'ro') ipython1/branches/ipython1-client-r3021/docs/examples/phistogram.py
r2604 r3043 18 18 """ 19 19 nengines = len(rc) 20 rc.push All(bins=bins, rng=rng)21 rc.execute All('import numpy')22 rc.execute All('hist, lower_edges = numpy.histogram(%s, bins, rng)' % a)23 lower_edges = rc.pull( 0, 'lower_edges')24 hist_array = rc.gather All('hist')20 rc.push(dict(bins=bins, rng=rng)) 21 rc.execute('import numpy') 22 rc.execute('hist, lower_edges = numpy.histogram(%s, bins, rng)' % a) 23 lower_edges = rc.pull('lower_edges', targets=0) 24 hist_array = rc.gather('hist') 25 25 hist_array.shape = (nengines,-1) 26 26 total_hist = numpy.sum(hist_array, 0) ipython1/branches/ipython1-client-r3021/docs/examples/plotting_frontend.py
r2491 r3043 19 19 import numpy as N 20 20 from pylab import * 21 import ipython1.kernel.api as kernel 21 from ipython1.kernel import client 22 22 23 23 # Get an IPython1 client 24 rc = kernel.RemoteController(('127.0.0.1',10105))24 rc = client.RemoteController(('127.0.0.1',10105)) 25 25 rc.getIDs() 26 26 27 27 # Run the simulation on all the engines 28 rc.run All('plotting_backend.py')28 rc.run('plotting_backend.py') 29 29 30 30 # Bring back the data 31 number = rc.pull All('number')32 d_number = rc.pull All('d_number')33 downx = rc.gather All('downx')34 downy = rc.gather All('downy')35 downpx = rc.gather All('downpx')36 downpy = rc.gather All('downpy')31 number = rc.pull('number') 32 d_number = rc.pull('d_number') 33 downx = rc.gather('downx') 34 downy = rc.gather('downy') 35 downpx = rc.gather('downpx') 36 downpy = rc.gather('downpy') 37 37 38 38 print "number: ", sum(number) ipython1/branches/ipython1-client-r3021/docs/examples/pwordfreq.py
r2620 r3043 13 13 """ 14 14 15 rc.execute All('freqs = wordfreq(%s)' %text)16 freqs_list = rc.pull All('freqs')15 rc.execute('freqs = wordfreq(%s)' %text) 16 freqs_list = rc.pull('freqs') 17 17 word_set = set() 18 18 for f in freqs_list: … … 26 26 if __name__ == '__main__': 27 27 # Create a RemoteController 28 import ipython1.kernel.api as kernel29 ipc = kernel.RemoteController(('127.0.0.1',10105))28 from ipython1.kernel import client 29 ipc = client.MultiEngineController(('127.0.0.1',10105)) 30 30 31 31 # Run the wordfreq script on the engines. 32 ipc.run All('wordfreq.py')32 ipc.run('wordfreq.py') 33 33 34 34 # Run the serial version … … 41 41 print "\nParallel word frequency count:" 42 42 files = ['davinci%i.txt' % i for i in range(4)] 43 ipc.scatter All('textfile', files)44 ipc.execute All('text = open(textfile[0]).read()')43 ipc.scatter('textfile', files) 44 ipc.execute('text = open(textfile[0]).read()') 45 45 pfreqs = pwordfreq(ipc,'text') 46 46 print_wordfreq(freqs) ipython1/branches/ipython1-client-r3021/docs/examples/rmt.ipy
r3021 r3043 8 8 9 9 from rmtkernel import * 10 import ipython1.kernel.api as kernel 10 from ipython1.kernel import client 11 11 12 12 … … 32 32 num_per_engine = num/nengines 33 33 print "Running with", num_per_engine, "per engine." 34 rc.push All(num_per_engine=num_per_engine, N=N)35 rc.execute All('diffs = ensembleDiffs(num_per_engine, N)')34 rc.push(dict(num_per_engine=num_per_engine, N=N)) 35 rc.execute('diffs = ensembleDiffs(num_per_engine, N)') 36 36 # gather blocks always for now 37 return rc.gatherAll('diffs') 37 pr = rc.gather('diffs') 38 return pr.r 38 39 39 40 40 41 # Main code 41 42 if __name__ == '__main__': 42 rc = kernel.RemoteController(('127.0.0.1',10105))43 rc.run All('rmtkernel.py')43 rc = client.MultiEngineController(('127.0.0.1',10105)) 44 rc.run('rmtkernel.py') 44 45 rc.block = False 45 46 ipython1/branches/ipython1-client-r3021/docs/examples/task1.py
r2466 r3043 1 import ipython1.kernel.api as kernel 1 from ipython1.kernel import client 2 2 3 tc = kernel.TaskController(('127.0.0.1', 10113))4 rc = kernel.RemoteController(('127.0.0.1', 10105))3 tc = client.TaskController(('127.0.0.1', 10113)) 4 rc = client.MultiEngineController(('127.0.0.1', 10105)) 5 5 6 rc.push All(d=30)6 rc.push(dict(d=30)) 7 7 8 8 cmd1 = """\ 9 9 a = 5 10 10 b = 10*d 11 c = a*b*d 11 12 """ 12 13 13 t1 = kernel.Task(cmd1, clearBefore=False, clearAfter=True, resultNames=['a','b','c'])14 t1 = client.Task(cmd1, clearBefore=False, clearAfter=True, resultNames=['a','b','c']) 14 15 tid1 = tc.run(t1) 15 16 tr1 = tc.getTaskResult(tid1,block=True) ipython1/branches/ipython1-client-r3021/docs/examples/task_profiler.py
r2476 r3043 20 20 21 21 from IPython.genutils import time 22 from ipython1.kernel import api as kernel22 from ipython1.kernel import client 23 23 24 24 def main(): … … 47 47 assert opts.tmax >= opts.tmin, "tmax must not be smaller than tmin" 48 48 49 rc = kernel.RemoteController((opts.controller, opts.meport))50 tc = kernel.TaskController((opts.controller, opts.tport))49 rc = client.RemoteController((opts.controller, opts.meport)) 50 tc = client.TaskController((opts.controller, opts.tport)) 51 51 52 52 rc.block=True 53 53 nengines = len(rc.getIDs()) 54 rc.execute All('from IPython.genutils import time')54 rc.execute('from IPython.genutils import time') 55 55 56 56 # the jobs should take a random time within a range 57 57 times = [random.random()*(opts.tmax-opts.tmin)+opts.tmin for i in range(opts.n)] 58 tasks = [ kernel.Task("time.sleep(%f)"%t) for t in times]58 tasks = [client.Task("time.sleep(%f)"%t) for t in times] 59 59 stime = sum(times) 60 60 ipython1/branches/ipython1-client-r3021/ipython1/kernel/client.py
r3027 r3043 21 21 RemoteController = MultiEngineController 22 22 23 default Address = (co['client']['connectToMultiEngineControllerOn']['ip'],23 defaultMECAddress = (co['client']['connectToMultiEngineControllerOn']['ip'], 24 24 co['client']['connectToMultiEngineControllerOn']['port']) 25 25 """The (ip,port) tuple of the default MultiEngineController.""" 26 26 27 from ipython1.kernel.task import Task, Dependency 28 29 TaskController = kernelConfigManager._import(co['client']['TaskController']) 30 """The default TaskController class obtained from config information.""" 31 32 defaultTCAddress = (co['client']['connectToTaskControllerOn']['ip'], 33 co['client']['connectToTaskControllerOn']['port']) 34 """The (ip,port) tuple of the default task controller.""" 27 35 28 36 ipython1/branches/ipython1-client-r3021/ipython1/kernel/config/__init__.py
r3027 r3043 108 108 clientConfig = { 109 109 'MultiEngineImplementation': 'ipython1.kernel.multienginexmlrpc.XMLRPCSynchronousMultiEngineClient', 110 'connectToMultiEngineControllerOn': {'ip': '127.0.0.1', 'port': xmlrpcMEPort} 111 #'TaskController': 'ipython1.kernel.taskxmlrpc.XMLRPCInteractiveTaskClient',112 #'connectToTaskControllerOn': {'ip': '127.0.0.1', 'port': xmlrpcTCPort}110 'connectToMultiEngineControllerOn': {'ip': '127.0.0.1', 'port': xmlrpcMEPort}, 111 'TaskController': 'ipython1.kernel.taskxmlrpc.XMLRPCInteractiveTaskClient', 112 'connectToTaskControllerOn': {'ip': '127.0.0.1', 'port': xmlrpcTCPort} 113 113 } 114 114 ipython1/branches/ipython1-client-r3021/ipython1/kernel/multiengine.py
r3041 r3043 502 502 def getPendingDeferred(deferredID, block=True): 503 503 """""" 504 505 def cleanOutDeferreds(): 506 """""" 504 507 505 508 ipython1/branches/ipython1-client-r3021/ipython1/kernel/multiengineclient.py
r3041 r3043 167 167 168 168 def __cmp__(self, other): 169 return self.resultID - other.resultID 170 169 if self.resultID < other.resultID: 170 return -1 171 else: 172 return 1 173 171 174 def _get_r(self): 172 175 return self.getResult(block=True) … … 387 390 return blockingCallFromThread(self.stpmultiengine.getPendingDeferred, deferredID, block) 388 391 392 def barrier(self, *pendingResults): 393 """Synchronize a set of `PendingResults`. 394 395 This method is a synchronization primitive that waits for a set of 396 `PendingResult` objects to complete. More specifically, barier does 397 the following. 398 399 * The `PendingResult`s are sorted by resultID. 400 * The `getResult` method is called for each `PendingResult` sequentially 401 with block=True. 402 * If a `PendingResult` gets a result that is an exception, it is 403 trapped and can be re-raised later by calling `getResult` again. 404 * The `PendingResult`s are flushed from the controller. 405 406 After barrier has been called on a `PendingResult`, its results can 407 be retrieved by calling `getResult` again or accesing the `r` attribute 408 of the instance. 409 """ 410 411 # Convert to list for sorting and check class type 412 prList = list(pendingResults) 413 for pr in prList: 414 if not isinstance(pr, PendingResult): 415 raise error.NotAPendingResult("Objects passed to barrier must be PendingResult instances") 416 417 # Sort the PendingResults so they are in order 418 prList.sort() 419 # Block on each PendingResult object 420 for pr in prList: 421 try: 422 result = pr.getResult(block=True) 423 except Exception: 424 pass 425 426 def flush(self, controller=False): 427 r1 = blockingCallFromThread(self.stpmultiengine.cleanOutDeferreds) 428 # Semi hack to get rid of the controllers pending deferreds as well. 429 if controller: 430 r2 = blockingCallFromThread(self.stpmultiengine.multiengine.smultiengine.cleanOutDeferreds) 431 return 432 389 433 #--------------------------------------------------------------------------- 390 434 # IEngineMultiplexer related methods … … 482 526 483 527 def getIDs(self): 484 return self._blockFromThread(self.stpmultiengine.getIDs) 485 528 result = blockingCallFromThread(self.stpmultiengine.getIDs) 529 return result 530 486 531 #--------------------------------------------------------------------------- 487 532 # IMultiEngineCoordinator ipython1/branches/ipython1-client-r3021/ipython1/kernel/multienginexmlrpc.py
r3037 r3043 143 143 return d 144 144 145 @packageResult 146 def xmlrpc_cleanOutDeferreds(self, request): 147 return self.smultiengine.cleanOutDeferreds() 148 145 149 def _addDeferredIDCallback(self, did, callback, *args, **kwargs): 146 150 self._deferredIDCallbacks[did] = (callback, args, kwargs) … … 352 356 return d 353 357 358 def cleanOutDeferreds(self): 359 d = self._proxy.callRemote('cleanOutDeferreds', deferredID, block) 360 d.addCallback(self.unpackage) 361 354 362 def _addDeferredIDCallback(self, did, callback, *args, **kwargs): 355 363 self._deferredIDCallbacks[did] = (callback, args, kwargs) ipython1/branches/ipython1-client-r3021/ipython1/kernel/scripts/ipcluster.py
r2833 r3043 163 163 print 'Your cluster is up and running.' 164 164 print 165 print 'For interactive use, you can make a RemoteController with:'166 print 167 print ' import ipython1.kernel.api as kernel'168 print " rc = kernel.RemoteController((%r,%s))" % \165 print 'For interactive use, you can make a MultiEngineController with:' 166 print 167 print 'from ipython1.kernel import client' 168 print "mec = client.MultiEngineController((%r,%s))" % \ 169 169 (control_host,control_port) 170 170 print 171 171 print 'You can then cleanly stop the cluster from IPython using:' 172 172 print 173 print ' ipc.killAll(controller=True)'173 print 'mec.killAll(controller=True)' 174 174 print 175 175 ipython1/branches/ipython1-client-r3021/ipython1/kernel/scripts/ipcluster1.py
r2807 r3043 144 144 print 'Your cluster is up and running.' 145 145 print 146 print 'For interactive use, you can make a RemoteController with:'146 print 'For interactive use, you can make a MultiEngineController with:' 147 147 print 148 print ' import ipython1.kernel.api as kernel'149 print " ipc = kernel.RemoteController((%r,%s))" % \148 print 'from ipython1.kernel import client' 149 print "mec = client.MultiEngineController((%r,%s))" % \ 150 150 (control_host,control_port) 151 151 print 152 152 print 'You can then cleanly stop the cluster from IPython using:' 153 153 print 154 print ' ipc.killAll(controller=True)'154 print 'mec.killAll(controller=True)' 155 155 print 156 156
