Changeset 2094

Show
Ignore:
Timestamp:
02/13/07 18:18:35 (4 years ago)
Author:
bgranger
Message:

The ControllerService? now logs the ip/port/pid of each engine that
connects to a file in IPYTHONRC/log/ipcontroller-###-engine-info.log.

There were some conflict in controllerservice.py that I also fixed. These
were from Min and I both editing this file.

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • ipython/branches/saw/doc/ChangeLog

    r2063 r2094  
     12007-02-13  Brian Granger <ellisonbg@gmail.com> 
     2 
     3    * Now the registerEngine method of the ControllerService takes the 
     4    ip, port and pid of the engine and logs them to a file in  
     5    ~/.ipython/log named ipcontroller-###-engine-info.log.  This is  
     6    to make it easier to kill engines.  Fixes ticket #111. 
     7 
     8 
    192007-01-30  Fernando Perez  <Fernando.Perez@colorado.edu> 
    210 
  • ipython/branches/saw/ipython1/kernel/controllerservice.py

    r2079 r2094  
    3737#------------------------------------------------------------------------------- 
    3838 
     39import os, sys 
     40 
    3941from twisted.application import service 
    4042from twisted.internet import defer, reactor 
     
    4850    IEngineSerialized, \ 
    4951    IEngineQueued 
    50  
     52     
     53from ipython1.config import cutils 
    5154 
    5255#------------------------------------------------------------------------------- 
     
    6467    engines = Attribute("A dict of engine ids and engine instances.") 
    6568         
    66     def registerEngine(remoteEngine, id): 
     69    def registerEngine(remoteEngine, id=None, ip=None, port=None,  
     70        pid=None): 
    6771        """Register new remote engine. 
    6872         
     
    7680    def unregisterEngine(id): 
    7781        """Handle a disconnecting engine.""" 
    78      
     82         
    7983    def onRegisterEngineDo(f, includeID, *args, **kwargs): 
    8084        """call f with *args and **kwargs when an engine is registered.   
     
    9094    def onUnregisterEngineDo(f): 
    9195        """stop calling f on unregistration""" 
    92      
    9396                 
    9497class IControllerBase(IControllerCore): 
     
    119122     
    120123    #--------------------------------------------------------------------------- 
     124    # Methods used to save the engine info to a log file 
     125    #--------------------------------------------------------------------------- 
     126     
     127    def _buildEngineInfoString(self, id, ip, port, pid): 
     128        if id is None: 
     129            id = -99 
     130        if ip is None: 
     131            ip = "-99" 
     132        if port is None: 
     133            port = -99 
     134        if pid is None: 
     135            pid = -99 
     136        return "Engine Info: %d %s %d %d" % (id, ip , port, pid) 
     137         
     138    def _logEngineInfo(self, id, ip, port, pid): 
     139        log.msg(self._buildEngineInfoString(id,ip,port,pid)) 
     140     
     141    def _getEngineInfoLogFile(self): 
     142        # Store all logs inside the ipython directory 
     143        ipdir = cutils.getIpythonDir() 
     144        pjoin = os.path.join 
     145        logdir_base = pjoin(ipdir,'log') 
     146        if not os.path.isdir(logdir_base): 
     147            os.makedirs(logdir_base) 
     148        logfile = os.path.join(logdir_base,'ipcontroller-%s-engine-info.log' % os.getpid()) 
     149        return logfile 
     150     
     151    def _logEngineInfoToFile(self, id, ip, port, pid): 
     152        """Log info about an engine to a log file. 
     153         
     154        When an engine registers with a ControllerService, the ControllerService 
     155        saves information about the engine to a log file.  That information 
     156        can be useful for various purposes, such as killing hung engines, etc. 
     157         
     158        This method takes the assigned id, ip/port and pid of the engine 
     159        and saves it to a file of the form: 
     160         
     161        ~/.ipython/log/ipcontroller-###-engine-info.log 
     162         
     163        where ### is the pid of the controller. 
     164         
     165        Each line of this file has the form: 
     166         
     167        Engine Info: ip ip port pid 
     168         
     169        If any of the entries are not known, they are replaced by -99. 
     170        """ 
     171         
     172        fname = self._getEngineInfoLogFile() 
     173        f = open(fname, 'a') 
     174        s = self._buildEngineInfoString(id,ip,port,pid) 
     175        f.write(s + '\n') 
     176        f.close() 
     177     
     178    #--------------------------------------------------------------------------- 
    121179    # IControllerCore methods 
    122180    #--------------------------------------------------------------------------- 
    123181         
    124     def registerEngine(self, remoteEngine, id=None): 
     182    def registerEngine(self, remoteEngine, id=None, 
     183        ip=None, port=None, pid=None): 
    125184        """Register new engine connection""" 
    126185         
     
    132191        assert IEngineQueued.providedBy(remoteEngine), \ 
    133192            "engine passed to registerEngine doesn't provide IEngineQueued" 
    134          
     193        assert isinstance(id, int) or id is None, \ 
     194            "id  to registerEngine must be an integer or None" 
     195        assert isinstance(ip, str) or ip is None, \ 
     196            "ip  to registerEngine must be a string or None" 
     197        assert isinstance(port, int) or port is None, \ 
     198            "port to registerEngine must be an integer or None" 
     199        assert isinstance(pid, int) or pid is None, \ 
     200            "pid to registerEngine must be an integer or None" 
     201             
    135202        desiredID = id 
    136203        if desiredID in self.engines.keys(): 
     
    145212        remoteEngine.service = self 
    146213        self.engines[getID] = remoteEngine 
     214 
     215        # Log the Engine Information for monitoring purposes 
     216        self._logEngineInfoToFile(getID, ip, port, pid) 
     217 
    147218        msg = "registered engine: %i" %getID 
    148219        log.msg(msg) 
     
    176247            else: 
    177248                log.msg("preserving id %i" %id) 
    178          
     249     
    179250        for i in range(len(self._onUnregister)): 
    180251            (f,args,kwargs,ifid) = self._onUnregister[i] 
     
    231302        self.engines = self.controller.engines 
    232303         
    233     def registerEngine(self, remoteEngine, id=None): 
    234         return self.controller.registerEngine(remoteEngine, id) 
     304    def registerEngine(self, remoteEngine, id=None, 
     305        ip=None, port=None, pid=None): 
     306        return self.controller.registerEngine(remoteEngine,  
     307            id, ip, port, pid) 
    235308     
    236309    def unregisterEngine(self, id): 
  • ipython/branches/saw/ipython1/kernel/enginepb.py

    r2069 r2094  
    3434#------------------------------------------------------------------------------- 
    3535 
     36import os 
    3637import cPickle as pickle 
    3738 
     
    172173         
    173174        self.rootObject = obj 
    174         d = self.rootObject.callRemote('registerEngine', self.engineReference, None) 
     175        # Now register myself with the controller 
     176        desiredID = self.service.id 
     177        d = self.rootObject.callRemote('registerEngine', self.engineReference,  
     178            desiredID, os.getpid()) 
    175179        return d.addCallbacks(self._referenceSent, self._getRootFailure) 
    176180     
     
    631635    """ 
    632636     
    633     def remote_registerEngine(self, engineReference): 
     637    def remote_registerEngine(self, engineReference, id=None, pid=None): 
    634638        """Register new engine on controller.""" 
    635639     
     
    649653        self.service = service 
    650654     
    651     def remote_registerEngine(self, engineReference, id, *interfaces): 
     655    def remote_registerEngine(self, engineReference, id=None, pid=None): 
    652656        # First adapt the engineReference to a basic non-queued engine 
    653657        engine = IEngineBase(engineReference) 
    654658        # Make it an IQueuedEngine before registration 
    655659        remoteEngine = IEngineQueued(engine) 
    656         regDict = self.service.registerEngine(remoteEngine, id) 
     660        # Get the ip/port of the remote side 
     661        peerAddress = engineReference.broker.transport.getPeer() 
     662        ip = peerAddress.host 
     663        port = peerAddress.port 
     664        regDict = self.service.registerEngine(remoteEngine, id, ip, port, pid) 
    657665        # Now setup callback for disconnect and unregistering the engine 
    658666        def notify(*args): 
  • ipython/branches/saw/ipython1/kernel/multiengineclient.py

    r2067 r2094  
    171171        return self._blockOrNot(d) 
    172172     
     173    def scatterAll(self, key, seq, style='basic', flatten=False): 
     174        return self.scatter('all', key, seq, style, flatten) 
     175         
    173176    def gather(self, targets, key, style='basic'): 
    174177        self.connect() 
     
    176179        return self._blockOrNot(d) 
    177180         
     181    def gatherAll(self, key, style='basic'): 
     182        return self.gather('all', key, style) 
    178183 
    179184#------------------------------------------------------------------------------- 
     
    329334        :return: ``True`` or ``False`` to indicate success or failure.     
    330335        """ 
    331          
    332336        fileobj = open(fname,'r') 
    333337        source = fileobj.read() 
     
    339343        # Now run the code 
    340344        return self.execute(targets, source) 
    341      
     345         
    342346    def runAll(self, fname): 
    343347        """Run a .py file on all engines. 
     
    397401         - `id`: A string representing the key. 
    398402        """ 
    399          
    400403        if isinstance(id, slice): 
    401404            return InteractiveMultiEngineClientView(self, id) 
     
    403406            return EngineProxy(self, id) 
    404407        elif isinstance(id, str): 
     408            self.connect() 
    405409            return self.pull('all', *(id,)) 
    406410        else: 
     
    409413    def __len__(self): 
    410414        """Return the number of available engines.""" 
    411         d = self.self.getIDs() 
     415        saveBlock = self.block 
     416        self.block = False 
     417        d = self.getIDs() 
    412418        d.addCallback(len) 
    413         return self.blockOrNot(d) 
     419        self.block = saveBlock 
     420        return self._blockOrNot(d) 
    414421         
    415422    def map(self, targets, functionSource, seq, style='basic'): 
     
    445452            '_ipython_map_seq_result = map(%s, _ipython_map_seq)' % \ 
    446453            functionSource         
    447         d = self.scatter(targets, '_ipython_map_seq', seq, style='basic') 
    448         d.addCallback(lambda _: self.execute(targets, sourceToRun)) 
    449         d.addCallback(lambda _: self.gather(targets, '_ipython_map_seq_result', style='basic')) 
     454        d1 = self.scatter(targets, '_ipython_map_seq', seq, style) 
     455        d2 = self.execute(targets, sourceToRun) 
     456        d3 = self.gather(targets, '_ipython_map_seq_result', style) 
     457        d = gatherBoth([d1 ,d2, d3], fireOnOneErrback=1, consumeErrors=1) 
     458        d.addCallback(lambda r: r[2]) 
    450459        self.block = saveBlock 
    451460        return self._blockOrNot(d) 
  • ipython/branches/saw/ipython1/test/test_enginepb.py

    r2057 r2094  
    9191    #--------------------------------------------------------------------------- 
    9292     
    93     def registerEngine(self, remoteEngine, id): 
     93    def registerEngine(self, remoteEngine, id=None, ip=None, port=None, pid=None): 
    9494        self.engine = remoteEngine 
    9595        # This fires the callbackchain to allow the tests to run 
  • ipython/branches/saw/ipython1/test/test_enginevanilla.py

    r2057 r2094  
    9090    MAX_MESSAGE_SIZE = 2*640*1024 
    9191     
    92     def registerEngine(self, remoteEngine, id): 
     92    def registerEngine(self, remoteEngine, id=None, ip=None, port=None, pid=None): 
    9393        self.engine = remoteEngine 
    9494        self.engine.id = 0 
  • ipython/branches/saw/ipython1/test/tst_multienginepb.ipy

    r2075 r2094  
    1 from ipython1.kernel.multienginepb import PBInteractiveMultiEngineClient 
     1import time 
     2 
     3from ipython1.kernel.multienginepb import PBInteractiveMultiEngineClient as client 
     4from ipython1.kernel import magic 
     5 
     6# Setup 
     7 
     8c = client(('127.0.0.1',10111)) 
     9c.connect() 
     10c.activate() 
     11c.block = True 
     12assert len(c) >= 4 
     13 
     14# Execute 
     15 
     16c.executeAll('a=5') 
     17c.execute([0,2],'b=10') 
     18c.execute([1,3],'b=20') 
     19c.executeAll('c=a*b') 
     20%result 
     21 
     22c.block=False 
     23 
     24d = c.executeAll('import time') 
     25d.addCallback(lambda _: c.executeAll('time.sleep(1)')) 
     26d.addCallback(lambda _: c.executeAll('import math')) 
     27c.blockOn(d) 
     28d = c.executeAll('a = 1.0/0') 
     29d.addErrback(lambda f: None) 
     30c.blockOn(d) 
     31 
     32c.block=True 
     33 
     34%px a = 5 
     35%px print a 
     36%pn 0 b = 10 
     37%pn 0 print b 
     38 
     39%autopx 
     40 
     41def f(x): 
     42    return x*2 
     43     
     44 
     45print f(1000000000) 
     46 
     47%autopx 
     48 
     49# Push/Pull 
     50 
     51c.pushAll(a=10, b=20) 
     52c.pullAll('a', 'b') 
     53c.pullAll('a') 
     54c.push([0,1], c=range(5)) 
     55c.push([2,3], c=range(5)) 
     56c.pullAll('c') 
     57 
     58c.block = False 
     59 
     60d = c.pushAll(e=30) 
     61d.addCallback(lambda _: c.pullAll('e')) 
     62c.blockOn(d) 
     63 
     64c.block = True 
     65 
     66c['a'] = 30 
     67c['a'] 
     68 
     69for i in c.getIDs(): 
     70    c[i]['a'] = i 
     71 
     72     
     73for i in c.getIDs(): 
     74    print i 
    275 
    376 
    4 !ipcluster -n 4 
     77# getResult 
    578 
     79c.getResultAll() 
     80 
     81# reset 
     82 
     83c.resetAll() 
     84c.getResultAll() 
     85 
     86# keys 
     87 
     88c.resetAll() 
     89c.pushAll(b=59) 
     90c.keysAll() 
     91 
     92# push/pullSerialized 
     93 
     94a = 10 
     95c.pushAll(a=a) 
     96s = c.pullSerialized(0, 'a') 
     97c.pushSerialized(0, a=s) 
     98assert a == c.pull(0, 'a') 
     99 
     100# Queue methods 
     101 
     102# getIDs/verifyTargets 
     103 
     104# scatter/gather 
     105 
     106c.scatterAll('a', range(24)) 
     107c.executeAll('b = [2.0*x for x in a]') 
     108c.gatherAll('b') 
     109 
     110# interactive methods 
     111 
     112c.pushAll(a=10, b=30) 
     113c.iexecuteAll('c=a*b') 
     114c.igetResultAll() 
     115 
     116c.iqueueStatusAll() 
     117 
     118f = open('test.py', 'w') 
     119f.write('import math\nd = 2.0*math.pi') 
     120f.close() 
     121c.runAll('test.py') 
     122print c.ipullAll('d') 
     123 
     124c.mapAll('lambda x: 2.0*x', range(20)) 
     125 
     126p = c.parallelizeAll('lambda x: 2.0*x') 
     127 
     128p(range(20)) 
     129 
  • ipython/branches/saw/ipython1/test/tst_multienginepb.tpl.txt

    r2075 r2094  
    1 %run tst_multiengineclient.py 
     1%run tst_multienginepb.ipy 
  • ipython/branches/saw/ipython1/test/tst_multienginepb.txt

    r2075 r2094  
    22---------------------------------------------------------------------------- 
    33 
    4 Begin included file tst_multiengineclient.py:: 
     4Begin included file tst_multienginepb.ipy:: 
    55 
    6     >>> from ipython1.kernel.multienginepb import PBInteractiveMultiEngineClient 
    7      
    8 End included file tst_multiengineclient.py 
    9  
    10 ---------------------------------------------------------------------------- 
    11