Changeset 3043

Show
Ignore:
Timestamp:
02/12/08 17:43:35 (10 months ago)
Author:
bgranger
Message:

Updated tests to use new API.

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • ipython1/branches/ipython1-client-r3021/docs/examples/DistributedHelloWorld.py

    r2472 r3043  
    33Ken Kinder <ken@kenkinder.com> 
    44""" 
    5 import ipython1.kernel.api as kernel 
    6 import ipython1.kernel.multienginexmlrpc 
    7 import ipython1.kernel.taskxmlrpc 
     5from ipython1.kernel import client 
    86 
    9 rc = kernel.TaskController(('127.0.0.1', 10113)) 
    10 ipc = kernel.RemoteController(('127.0.0.1', 10105)) 
    11 assert isinstance(rc, ipython1.kernel.taskxmlrpc.XMLRPCInteractiveTaskClient) 
    12 assert isinstance(ipc, ipython1.kernel.multienginexmlrpc.XMLRPCInteractiveMultiEngineClient) 
     7tc = client.TaskController(('127.0.0.1', 10113)) 
     8mec = client.MultiEngineController(('127.0.0.1', 10105)) 
    139 
    14 ipc.execute('all', 'import time') 
    15 helloTaskId = rc.run(kernel.Task('time.sleep(3) ; word = "Hello,"', resultNames=['word'])) 
    16 worldTaskId = rc.run(kernel.Task('time.sleep(3) ; word = "World!"', resultNames=['word'])) 
     10mec.execute('import time') 
     11helloTaskId = tc.run(client.Task('time.sleep(3) ; word = "Hello,"', resultNames=['word'])) 
     12worldTaskId = tc.run(client.Task('time.sleep(3) ; word = "World!"', resultNames=['word'])) 
    1713 
    18 print rc.getTaskResult(helloTaskId).ns.word, rc.getTaskResult(worldTaskId).ns.word 
     14print tc.getTaskResult(helloTaskId).ns.word, tc.getTaskResult(worldTaskId).ns.word 
  • ipython1/branches/ipython1-client-r3021/docs/examples/fetchparse.py

    r2451 r3043  
    1010""" 
    1111from twisted.python.failure import Failure 
    12 import ipython1.kernel.api as kernel 
     12from ipython1.kernel import client 
    1313import time 
    1414 
     
    3939     
    4040    def __init__(self, site): 
    41         self.tc = kernel.TaskController(('127.0.0.1', 10113)) 
    42         self.rc = kernel.RemoteController(('127.0.0.1', 10105)) 
    43         self.rc.execute('all', fetchParse) 
     41        self.tc = client.TaskController(('127.0.0.1', 10113)) 
     42        self.rc = client.MultiEngineController(('127.0.0.1', 10105)) 
     43        self.rc.execute(fetchParse) 
    4444         
    4545        self.allLinks = [] 
     
    5454            if url.startswith(self.site): 
    5555                print '    ', url 
    56                 self.linksWorking[url] = self.tc.run(kernel.Task('links = fetchAndParse(url)', resultNames=['links'], setupNS={'url': url})) 
     56                self.linksWorking[url] = self.tc.run(client.Task('links = fetchAndParse(url)', resultNames=['links'], setupNS={'url': url})) 
    5757         
    5858    def onVisitDone(self, result, url): 
     
    6565                print '    ', line 
    6666        else: 
    67             for link in result.links: 
     67            for link in result.ns.links: 
    6868                self.visitLink(link) 
    6969                 
  • ipython1/branches/ipython1-client-r3021/docs/examples/mcdriver.py

    r2486 r3043  
    33"""Run a Monte-Carlo options pricer in parallel.""" 
    44 
    5 import ipython1.kernel.api as kernel 
     5from ipython1.kernel import client 
    66import numpy as N 
    77from mcpricer import MCOptionPricer 
    88 
    99 
    10 tc = kernel.TaskController(('127.0.0.1', 10113)) 
    11 rc = kernel.RemoteController(('127.0.0.1', 10105)) 
     10tc = client.TaskController(('127.0.0.1', 10113)) 
     11rc = client.MultiEngineController(('127.0.0.1', 10105)) 
    1212 
    1313# Initialize the common code on the engines 
    14 rc.runAll('mcpricer.py') 
     14rc.run('mcpricer.py') 
    1515 
    1616# Push the variables that won't change (stock print, interest rate, days and MC paths) 
    17 rc.pushAll(S=100.0, r=0.05, days=260, paths=10000
     17rc.push(dict(S=100.0, r=0.05, days=260, paths=10000)
    1818 
    1919task_string = """\ 
     
    3131for K in K_vals: 
    3232    for sigma in sigma_vals: 
    33         t = kernel.Task(task_string,  
     33        t = client.Task(task_string,  
    3434            setupNS=dict(sigma=sigma,K=K), 
    3535            resultNames=['vp','ap','vc','ac','sigma','K']) 
  • ipython1/branches/ipython1-client-r3021/docs/examples/nwmerge.py

    r2616 r3043  
    7676    # Check that the object exists on the engine and pin a reference to it 
    7777    iter_name = '_%s_rmt_iter_' % name 
    78     rc.execute(engine,'%s = iter(%s)' % (iter_name,name)
     78    rc.execute('%s = iter(%s)' % (iter_name,name), targets=engine
    7979    tpl = '_tmp = %s.next()' % iter_name 
    8080    while True: 
    81         rc.execute(engine,tpl
    82         yield rc.pull(engine,'_tmp')[0] 
     81        rc.execute(tpl, targets=engine
     82        yield rc.pull('_tmp', targets=engine)[0] 
    8383 
    8484 
     
    8686if __name__ == '__main__': 
    8787 
    88     import ipython1.kernel.api as kernel 
    89     ipc = kernel.RemoteController(('127.0.0.1',10105)) 
     88    from ipython1.kernel import client 
     89    ipc = client.RemoteController(('127.0.0.1',10105)) 
    9090    print 'Engine IDs:',ipc.getIDs() 
    9191 
     
    9898    # computation.  In this simple example, we just send them over into the 
    9999    # remote engines.  They will all be called 'a' in each engine. 
    100     ipc.push(0,a=a0) 
    101     ipc.push(1,a=a1) 
    102     ipc.push(2,a=a2) 
     100    ipc.push(dict(a=a0), targets=0) 
     101    ipc.push(dict(a=a1), targets=1) 
     102    ipc.push(dict(a=a2), targets=2) 
    103103 
    104104    # And we now make a local object which represents the remote iterator 
  • ipython1/branches/ipython1-client-r3021/docs/examples/parallel_pylab.ipy

    r2486 r3043  
    2121import numpy as N 
    2222from pylab import * 
    23 import ipython1.kernel.api as kernel 
     23from ipython1.kernel import client 
    2424 
    2525# Get an IPython1 client 
    26 rc = kernel.RemoteController(('127.0.0.1',10105)) 
     26rc = client.MultiEngineController(('127.0.0.1',10105)) 
    2727rc.getIDs() 
    2828rc.activate() 
     
    4040 
    4141# Bring back the data 
    42 x_local = rc.gatherAll('x') 
    43 y_local = rc.gatherAll('y') 
     42x_local = rc.gather('x') 
     43y_local = rc.gather('y') 
    4444 
    4545# Make a scatter plot of the gathered data 
    46 scatter(x_local, y_local
     46plot(x_local, y_local,'ro'
  • ipython1/branches/ipython1-client-r3021/docs/examples/phistogram.py

    r2604 r3043  
    1818    """ 
    1919    nengines = len(rc) 
    20     rc.pushAll(bins=bins, rng=rng
    21     rc.executeAll('import numpy') 
    22     rc.executeAll('hist, lower_edges = numpy.histogram(%s, bins, rng)' % a) 
    23     lower_edges = rc.pull(0, 'lower_edges'
    24     hist_array = rc.gatherAll('hist') 
     20    rc.push(dict(bins=bins, rng=rng)
     21    rc.execute('import numpy') 
     22    rc.execute('hist, lower_edges = numpy.histogram(%s, bins, rng)' % a) 
     23    lower_edges = rc.pull('lower_edges', targets=0
     24    hist_array = rc.gather('hist') 
    2525    hist_array.shape = (nengines,-1) 
    2626    total_hist = numpy.sum(hist_array, 0) 
  • ipython1/branches/ipython1-client-r3021/docs/examples/plotting_frontend.py

    r2491 r3043  
    1919import numpy as N 
    2020from pylab import * 
    21 import ipython1.kernel.api as kernel 
     21from ipython1.kernel import client 
    2222 
    2323# Get an IPython1 client 
    24 rc = kernel.RemoteController(('127.0.0.1',10105)) 
     24rc = client.RemoteController(('127.0.0.1',10105)) 
    2525rc.getIDs() 
    2626 
    2727# Run the simulation on all the engines 
    28 rc.runAll('plotting_backend.py') 
     28rc.run('plotting_backend.py') 
    2929 
    3030# Bring back the data 
    31 number = rc.pullAll('number') 
    32 d_number = rc.pullAll('d_number') 
    33 downx = rc.gatherAll('downx') 
    34 downy = rc.gatherAll('downy') 
    35 downpx = rc.gatherAll('downpx') 
    36 downpy = rc.gatherAll('downpy') 
     31number = rc.pull('number') 
     32d_number = rc.pull('d_number') 
     33downx = rc.gather('downx') 
     34downy = rc.gather('downy') 
     35downpx = rc.gather('downpx') 
     36downpy = rc.gather('downpy') 
    3737 
    3838print "number: ", sum(number) 
  • ipython1/branches/ipython1-client-r3021/docs/examples/pwordfreq.py

    r2620 r3043  
    1313    """ 
    1414     
    15     rc.executeAll('freqs = wordfreq(%s)' %text) 
    16     freqs_list = rc.pullAll('freqs') 
     15    rc.execute('freqs = wordfreq(%s)' %text) 
     16    freqs_list = rc.pull('freqs') 
    1717    word_set = set() 
    1818    for f in freqs_list: 
     
    2626if __name__ == '__main__': 
    2727    # Create a RemoteController 
    28     import ipython1.kernel.api as kernel 
    29     ipc = kernel.RemoteController(('127.0.0.1',10105)) 
     28    from ipython1.kernel import client 
     29    ipc = client.MultiEngineController(('127.0.0.1',10105)) 
    3030     
    3131    # Run the wordfreq script on the engines. 
    32     ipc.runAll('wordfreq.py') 
     32    ipc.run('wordfreq.py') 
    3333 
    3434    # Run the serial version 
     
    4141    print "\nParallel word frequency count:" 
    4242    files = ['davinci%i.txt' % i for i in range(4)] 
    43     ipc.scatterAll('textfile', files) 
    44     ipc.executeAll('text = open(textfile[0]).read()') 
     43    ipc.scatter('textfile', files) 
     44    ipc.execute('text = open(textfile[0]).read()') 
    4545    pfreqs = pwordfreq(ipc,'text') 
    4646    print_wordfreq(freqs) 
  • ipython1/branches/ipython1-client-r3021/docs/examples/rmt.ipy

    r3021 r3043  
    88 
    99from rmtkernel import * 
    10 import ipython1.kernel.api as kernel 
     10from ipython1.kernel import client 
    1111 
    1212 
     
    3232    num_per_engine = num/nengines 
    3333    print "Running with", num_per_engine, "per engine." 
    34     rc.pushAll(num_per_engine=num_per_engine, N=N
    35     rc.executeAll('diffs = ensembleDiffs(num_per_engine, N)') 
     34    rc.push(dict(num_per_engine=num_per_engine, N=N)
     35    rc.execute('diffs = ensembleDiffs(num_per_engine, N)') 
    3636    # gather blocks always for now 
    37     return rc.gatherAll('diffs') 
     37    pr = rc.gather('diffs') 
     38    return pr.r 
    3839 
    3940 
    4041# Main code 
    4142if __name__ == '__main__': 
    42     rc = kernel.RemoteController(('127.0.0.1',10105)) 
    43     rc.runAll('rmtkernel.py') 
     43    rc = client.MultiEngineController(('127.0.0.1',10105)) 
     44    rc.run('rmtkernel.py') 
    4445    rc.block = False 
    4546 
  • ipython1/branches/ipython1-client-r3021/docs/examples/task1.py

    r2466 r3043  
    1 import ipython1.kernel.api as kernel 
     1from ipython1.kernel import client 
    22 
    3 tc = kernel.TaskController(('127.0.0.1', 10113)) 
    4 rc = kernel.RemoteController(('127.0.0.1', 10105)) 
     3tc = client.TaskController(('127.0.0.1', 10113)) 
     4rc = client.MultiEngineController(('127.0.0.1', 10105)) 
    55 
    6 rc.pushAll(d=30
     6rc.push(dict(d=30)
    77 
    88cmd1 = """\ 
    99a = 5 
    1010b = 10*d 
     11c = a*b*d 
    1112""" 
    1213 
    13 t1 = kernel.Task(cmd1, clearBefore=False, clearAfter=True, resultNames=['a','b','c']) 
     14t1 = client.Task(cmd1, clearBefore=False, clearAfter=True, resultNames=['a','b','c']) 
    1415tid1 = tc.run(t1) 
    1516tr1 = tc.getTaskResult(tid1,block=True) 
  • ipython1/branches/ipython1-client-r3021/docs/examples/task_profiler.py

    r2476 r3043  
    2020 
    2121from IPython.genutils import time 
    22 from ipython1.kernel import api as kernel 
     22from ipython1.kernel import client 
    2323 
    2424def main(): 
     
    4747    assert opts.tmax >= opts.tmin, "tmax must not be smaller than tmin" 
    4848     
    49     rc = kernel.RemoteController((opts.controller, opts.meport)) 
    50     tc = kernel.TaskController((opts.controller, opts.tport)) 
     49    rc = client.RemoteController((opts.controller, opts.meport)) 
     50    tc = client.TaskController((opts.controller, opts.tport)) 
    5151     
    5252    rc.block=True 
    5353    nengines = len(rc.getIDs()) 
    54     rc.executeAll('from IPython.genutils import time') 
     54    rc.execute('from IPython.genutils import time') 
    5555 
    5656    # the jobs should take a random time within a range 
    5757    times = [random.random()*(opts.tmax-opts.tmin)+opts.tmin for i in range(opts.n)] 
    58     tasks = [kernel.Task("time.sleep(%f)"%t) for t in times] 
     58    tasks = [client.Task("time.sleep(%f)"%t) for t in times] 
    5959    stime = sum(times) 
    6060     
  • ipython1/branches/ipython1-client-r3021/ipython1/kernel/client.py

    r3027 r3043  
    2121RemoteController = MultiEngineController 
    2222 
    23 defaultAddress = (co['client']['connectToMultiEngineControllerOn']['ip'], 
     23defaultMECAddress = (co['client']['connectToMultiEngineControllerOn']['ip'], 
    2424    co['client']['connectToMultiEngineControllerOn']['port']) 
    2525"""The (ip,port) tuple of the default MultiEngineController.""" 
    2626 
     27from ipython1.kernel.task import Task, Dependency 
     28 
     29TaskController = kernelConfigManager._import(co['client']['TaskController']) 
     30"""The default TaskController class obtained from config information.""" 
     31 
     32defaultTCAddress = (co['client']['connectToTaskControllerOn']['ip'], 
     33    co['client']['connectToTaskControllerOn']['port']) 
     34"""The (ip,port) tuple of the default task controller.""" 
    2735 
    2836 
  • ipython1/branches/ipython1-client-r3021/ipython1/kernel/config/__init__.py

    r3027 r3043  
    108108clientConfig = { 
    109109    'MultiEngineImplementation': 'ipython1.kernel.multienginexmlrpc.XMLRPCSynchronousMultiEngineClient', 
    110     'connectToMultiEngineControllerOn': {'ip': '127.0.0.1', 'port': xmlrpcMEPort} 
    111     # 'TaskController': 'ipython1.kernel.taskxmlrpc.XMLRPCInteractiveTaskClient', 
    112     # 'connectToTaskControllerOn': {'ip': '127.0.0.1', 'port': xmlrpcTCPort} 
     110    'connectToMultiEngineControllerOn': {'ip': '127.0.0.1', 'port': xmlrpcMEPort}, 
     111    'TaskController': 'ipython1.kernel.taskxmlrpc.XMLRPCInteractiveTaskClient', 
     112    'connectToTaskControllerOn': {'ip': '127.0.0.1', 'port': xmlrpcTCPort} 
    113113} 
    114114 
  • ipython1/branches/ipython1-client-r3021/ipython1/kernel/multiengine.py

    r3041 r3043  
    502502    def getPendingDeferred(deferredID, block=True): 
    503503        """""" 
     504     
     505    def cleanOutDeferreds(): 
     506        """""" 
    504507 
    505508 
  • ipython1/branches/ipython1-client-r3021/ipython1/kernel/multiengineclient.py

    r3041 r3043  
    167167         
    168168    def __cmp__(self, other): 
    169         return self.resultID - other.resultID 
    170          
     169        if self.resultID < other.resultID: 
     170            return -1 
     171        else: 
     172            return 1 
     173             
    171174    def _get_r(self): 
    172175        return self.getResult(block=True) 
     
    387390        return blockingCallFromThread(self.stpmultiengine.getPendingDeferred, deferredID, block) 
    388391     
     392    def barrier(self, *pendingResults): 
     393        """Synchronize a set of `PendingResults`. 
     394         
     395        This method is a synchronization primitive that waits for a set of 
     396        `PendingResult` objects to complete.  More specifically, barier does 
     397        the following. 
     398         
     399        * The `PendingResult`s are sorted by resultID. 
     400        * The `getResult` method is called for each `PendingResult` sequentially 
     401          with block=True. 
     402        * If a `PendingResult` gets a result that is an exception, it is  
     403          trapped and can be re-raised later by calling `getResult` again. 
     404        * The `PendingResult`s are flushed from the controller. 
     405                 
     406        After barrier has been called on a `PendingResult`, its results can  
     407        be retrieved by calling `getResult` again or accesing the `r` attribute 
     408        of the instance. 
     409        """ 
     410 
     411        # Convert to list for sorting and check class type  
     412        prList = list(pendingResults) 
     413        for pr in prList: 
     414            if not isinstance(pr, PendingResult): 
     415                raise error.NotAPendingResult("Objects passed to barrier must be PendingResult instances") 
     416                             
     417        # Sort the PendingResults so they are in order 
     418        prList.sort() 
     419        # Block on each PendingResult object 
     420        for pr in prList: 
     421            try: 
     422                result = pr.getResult(block=True) 
     423            except Exception: 
     424                pass 
     425     
     426    def flush(self, controller=False): 
     427        r1 = blockingCallFromThread(self.stpmultiengine.cleanOutDeferreds) 
     428        # Semi hack to get rid of the controllers pending deferreds as well. 
     429        if controller: 
     430            r2 = blockingCallFromThread(self.stpmultiengine.multiengine.smultiengine.cleanOutDeferreds) 
     431        return 
     432     
    389433    #--------------------------------------------------------------------------- 
    390434    # IEngineMultiplexer related methods 
     
    482526     
    483527    def getIDs(self): 
    484         return self._blockFromThread(self.stpmultiengine.getIDs) 
    485      
     528        result = blockingCallFromThread(self.stpmultiengine.getIDs) 
     529        return result 
     530         
    486531    #--------------------------------------------------------------------------- 
    487532    # IMultiEngineCoordinator 
  • ipython1/branches/ipython1-client-r3021/ipython1/kernel/multienginexmlrpc.py

    r3037 r3043  
    143143        return d 
    144144        
     145    @packageResult 
     146    def xmlrpc_cleanOutDeferreds(self, request): 
     147        return self.smultiengine.cleanOutDeferreds() 
     148     
    145149    def _addDeferredIDCallback(self, did, callback, *args, **kwargs): 
    146150        self._deferredIDCallbacks[did] = (callback, args, kwargs) 
     
    352356        return d 
    353357     
     358    def cleanOutDeferreds(self): 
     359        d = self._proxy.callRemote('cleanOutDeferreds', deferredID, block) 
     360        d.addCallback(self.unpackage)         
     361     
    354362    def _addDeferredIDCallback(self, did, callback, *args, **kwargs): 
    355363        self._deferredIDCallbacks[did] = (callback, args, kwargs) 
  • ipython1/branches/ipython1-client-r3021/ipython1/kernel/scripts/ipcluster.py

    r2833 r3043  
    163163    print 'Your cluster is up and running.' 
    164164    print 
    165     print 'For interactive use, you can make a Remote Controller with:' 
    166     print 
    167     print 'import ipython1.kernel.api as kernel
    168     print "rc = kernel.RemoteController((%r,%s))" % \ 
     165    print 'For interactive use, you can make a MultiEngineController with:' 
     166    print 
     167    print 'from ipython1.kernel import client
     168    print "mec = client.MultiEngineController((%r,%s))" % \ 
    169169          (control_host,control_port) 
    170170    print 
    171171    print 'You can then cleanly stop the cluster from IPython using:' 
    172172    print 
    173     print 'ipc.killAll(controller=True)' 
     173    print 'mec.killAll(controller=True)' 
    174174    print 
    175175 
  • ipython1/branches/ipython1-client-r3021/ipython1/kernel/scripts/ipcluster1.py

    r2807 r3043  
    144144    print 'Your cluster is up and running.' 
    145145    print 
    146     print 'For interactive use, you can make a Remote Controller with:' 
     146    print 'For interactive use, you can make a MultiEngineController with:' 
    147147    print 
    148     print 'import ipython1.kernel.api as kernel
    149     print "ipc = kernel.RemoteController((%r,%s))" % \ 
     148    print 'from ipython1.kernel import client
     149    print "mec = client.MultiEngineController((%r,%s))" % \ 
    150150          (control_host,control_port) 
    151151    print 
    152152    print 'You can then cleanly stop the cluster from IPython using:' 
    153153    print 
    154     print 'ipc.killAll(controller=True)' 
     154    print 'mec.killAll(controller=True)' 
    155155    print 
    156156