Changeset 2094
- Timestamp:
- 02/13/07 18:18:35 (4 years ago)
- Files:
-
- ipython/branches/saw/doc/ChangeLog (modified) (1 diff)
- ipython/branches/saw/ipython1/kernel/controllerservice.py (modified) (10 diffs)
- ipython/branches/saw/ipython1/kernel/enginepb.py (modified) (4 diffs)
- ipython/branches/saw/ipython1/kernel/multiengineclient.py (modified) (8 diffs)
- ipython/branches/saw/ipython1/test/test_enginepb.py (modified) (1 diff)
- ipython/branches/saw/ipython1/test/test_enginevanilla.py (modified) (1 diff)
- ipython/branches/saw/ipython1/test/tst_multienginepb.ipy (moved) (moved from ipython/branches/saw/ipython1/test/tst_multienginepb.py) (1 diff)
- ipython/branches/saw/ipython1/test/tst_multienginepb.tpl.txt (modified) (1 diff)
- ipython/branches/saw/ipython1/test/tst_multienginepb.txt (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
ipython/branches/saw/doc/ChangeLog
r2063 r2094 1 2007-02-13 Brian Granger <ellisonbg@gmail.com> 2 3 * Now the registerEngine method of the ControllerService takes the 4 ip, port and pid of the engine and logs them to a file in 5 ~/.ipython/log named ipcontroller-###-engine-info.log. This is 6 to make it easier to kill engines. Fixes ticket #111. 7 8 1 9 2007-01-30 Fernando Perez <Fernando.Perez@colorado.edu> 2 10 ipython/branches/saw/ipython1/kernel/controllerservice.py
r2079 r2094 37 37 #------------------------------------------------------------------------------- 38 38 39 import os, sys 40 39 41 from twisted.application import service 40 42 from twisted.internet import defer, reactor … … 48 50 IEngineSerialized, \ 49 51 IEngineQueued 50 52 53 from ipython1.config import cutils 51 54 52 55 #------------------------------------------------------------------------------- … … 64 67 engines = Attribute("A dict of engine ids and engine instances.") 65 68 66 def registerEngine(remoteEngine, id): 69 def registerEngine(remoteEngine, id=None, ip=None, port=None, 70 pid=None): 67 71 """Register new remote engine. 68 72 … … 76 80 def unregisterEngine(id): 77 81 """Handle a disconnecting engine.""" 78 82 79 83 def onRegisterEngineDo(f, includeID, *args, **kwargs): 80 84 """call f with *args and **kwargs when an engine is registered. … … 90 94 def onUnregisterEngineDo(f): 91 95 """stop calling f on unregistration""" 92 93 96 94 97 class IControllerBase(IControllerCore): … … 119 122 120 123 #--------------------------------------------------------------------------- 124 # Methods used to save the engine info to a log file 125 #--------------------------------------------------------------------------- 126 127 def _buildEngineInfoString(self, id, ip, port, pid): 128 if id is None: 129 id = -99 130 if ip is None: 131 ip = "-99" 132 if port is None: 133 port = -99 134 if pid is None: 135 pid = -99 136 return "Engine Info: %d %s %d %d" % (id, ip , port, pid) 137 138 def _logEngineInfo(self, id, ip, port, pid): 139 log.msg(self._buildEngineInfoString(id,ip,port,pid)) 140 141 def _getEngineInfoLogFile(self): 142 # Store all logs inside the ipython directory 143 ipdir = cutils.getIpythonDir() 144 pjoin = os.path.join 145 logdir_base = pjoin(ipdir,'log') 146 if not os.path.isdir(logdir_base): 147 os.makedirs(logdir_base) 148 logfile = os.path.join(logdir_base,'ipcontroller-%s-engine-info.log' % os.getpid()) 149 return logfile 150 151 def _logEngineInfoToFile(self, id, ip, port, pid): 152 """Log info about an engine to a log file. 153 154 When an engine registers with a ControllerService, the ControllerService 155 saves information about the engine to a log file. That information 156 can be useful for various purposes, such as killing hung engines, etc. 157 158 This method takes the assigned id, ip/port and pid of the engine 159 and saves it to a file of the form: 160 161 ~/.ipython/log/ipcontroller-###-engine-info.log 162 163 where ### is the pid of the controller. 164 165 Each line of this file has the form: 166 167 Engine Info: ip ip port pid 168 169 If any of the entries are not known, they are replaced by -99. 170 """ 171 172 fname = self._getEngineInfoLogFile() 173 f = open(fname, 'a') 174 s = self._buildEngineInfoString(id,ip,port,pid) 175 f.write(s + '\n') 176 f.close() 177 178 #--------------------------------------------------------------------------- 121 179 # IControllerCore methods 122 180 #--------------------------------------------------------------------------- 123 181 124 def registerEngine(self, remoteEngine, id=None): 182 def registerEngine(self, remoteEngine, id=None, 183 ip=None, port=None, pid=None): 125 184 """Register new engine connection""" 126 185 … … 132 191 assert IEngineQueued.providedBy(remoteEngine), \ 133 192 "engine passed to registerEngine doesn't provide IEngineQueued" 134 193 assert isinstance(id, int) or id is None, \ 194 "id to registerEngine must be an integer or None" 195 assert isinstance(ip, str) or ip is None, \ 196 "ip to registerEngine must be a string or None" 197 assert isinstance(port, int) or port is None, \ 198 "port to registerEngine must be an integer or None" 199 assert isinstance(pid, int) or pid is None, \ 200 "pid to registerEngine must be an integer or None" 201 135 202 desiredID = id 136 203 if desiredID in self.engines.keys(): … … 145 212 remoteEngine.service = self 146 213 self.engines[getID] = remoteEngine 214 215 # Log the Engine Information for monitoring purposes 216 self._logEngineInfoToFile(getID, ip, port, pid) 217 147 218 msg = "registered engine: %i" %getID 148 219 log.msg(msg) … … 176 247 else: 177 248 log.msg("preserving id %i" %id) 178 249 179 250 for i in range(len(self._onUnregister)): 180 251 (f,args,kwargs,ifid) = self._onUnregister[i] … … 231 302 self.engines = self.controller.engines 232 303 233 def registerEngine(self, remoteEngine, id=None): 234 return self.controller.registerEngine(remoteEngine, id) 304 def registerEngine(self, remoteEngine, id=None, 305 ip=None, port=None, pid=None): 306 return self.controller.registerEngine(remoteEngine, 307 id, ip, port, pid) 235 308 236 309 def unregisterEngine(self, id): ipython/branches/saw/ipython1/kernel/enginepb.py
r2069 r2094 34 34 #------------------------------------------------------------------------------- 35 35 36 import os 36 37 import cPickle as pickle 37 38 … … 172 173 173 174 self.rootObject = obj 174 d = self.rootObject.callRemote('registerEngine', self.engineReference, None) 175 # Now register myself with the controller 176 desiredID = self.service.id 177 d = self.rootObject.callRemote('registerEngine', self.engineReference, 178 desiredID, os.getpid()) 175 179 return d.addCallbacks(self._referenceSent, self._getRootFailure) 176 180 … … 631 635 """ 632 636 633 def remote_registerEngine(self, engineReference ):637 def remote_registerEngine(self, engineReference, id=None, pid=None): 634 638 """Register new engine on controller.""" 635 639 … … 649 653 self.service = service 650 654 651 def remote_registerEngine(self, engineReference, id , *interfaces):655 def remote_registerEngine(self, engineReference, id=None, pid=None): 652 656 # First adapt the engineReference to a basic non-queued engine 653 657 engine = IEngineBase(engineReference) 654 658 # Make it an IQueuedEngine before registration 655 659 remoteEngine = IEngineQueued(engine) 656 regDict = self.service.registerEngine(remoteEngine, id) 660 # Get the ip/port of the remote side 661 peerAddress = engineReference.broker.transport.getPeer() 662 ip = peerAddress.host 663 port = peerAddress.port 664 regDict = self.service.registerEngine(remoteEngine, id, ip, port, pid) 657 665 # Now setup callback for disconnect and unregistering the engine 658 666 def notify(*args): ipython/branches/saw/ipython1/kernel/multiengineclient.py
r2067 r2094 171 171 return self._blockOrNot(d) 172 172 173 def scatterAll(self, key, seq, style='basic', flatten=False): 174 return self.scatter('all', key, seq, style, flatten) 175 173 176 def gather(self, targets, key, style='basic'): 174 177 self.connect() … … 176 179 return self._blockOrNot(d) 177 180 181 def gatherAll(self, key, style='basic'): 182 return self.gather('all', key, style) 178 183 179 184 #------------------------------------------------------------------------------- … … 329 334 :return: ``True`` or ``False`` to indicate success or failure. 330 335 """ 331 332 336 fileobj = open(fname,'r') 333 337 source = fileobj.read() … … 339 343 # Now run the code 340 344 return self.execute(targets, source) 341 345 342 346 def runAll(self, fname): 343 347 """Run a .py file on all engines. … … 397 401 - `id`: A string representing the key. 398 402 """ 399 400 403 if isinstance(id, slice): 401 404 return InteractiveMultiEngineClientView(self, id) … … 403 406 return EngineProxy(self, id) 404 407 elif isinstance(id, str): 408 self.connect() 405 409 return self.pull('all', *(id,)) 406 410 else: … … 409 413 def __len__(self): 410 414 """Return the number of available engines.""" 411 d = self.self.getIDs() 415 saveBlock = self.block 416 self.block = False 417 d = self.getIDs() 412 418 d.addCallback(len) 413 return self.blockOrNot(d) 419 self.block = saveBlock 420 return self._blockOrNot(d) 414 421 415 422 def map(self, targets, functionSource, seq, style='basic'): … … 445 452 '_ipython_map_seq_result = map(%s, _ipython_map_seq)' % \ 446 453 functionSource 447 d = self.scatter(targets, '_ipython_map_seq', seq, style='basic') 448 d.addCallback(lambda _: self.execute(targets, sourceToRun)) 449 d.addCallback(lambda _: self.gather(targets, '_ipython_map_seq_result', style='basic')) 454 d1 = self.scatter(targets, '_ipython_map_seq', seq, style) 455 d2 = self.execute(targets, sourceToRun) 456 d3 = self.gather(targets, '_ipython_map_seq_result', style) 457 d = gatherBoth([d1 ,d2, d3], fireOnOneErrback=1, consumeErrors=1) 458 d.addCallback(lambda r: r[2]) 450 459 self.block = saveBlock 451 460 return self._blockOrNot(d) ipython/branches/saw/ipython1/test/test_enginepb.py
r2057 r2094 91 91 #--------------------------------------------------------------------------- 92 92 93 def registerEngine(self, remoteEngine, id ):93 def registerEngine(self, remoteEngine, id=None, ip=None, port=None, pid=None): 94 94 self.engine = remoteEngine 95 95 # This fires the callbackchain to allow the tests to run ipython/branches/saw/ipython1/test/test_enginevanilla.py
r2057 r2094 90 90 MAX_MESSAGE_SIZE = 2*640*1024 91 91 92 def registerEngine(self, remoteEngine, id ):92 def registerEngine(self, remoteEngine, id=None, ip=None, port=None, pid=None): 93 93 self.engine = remoteEngine 94 94 self.engine.id = 0 ipython/branches/saw/ipython1/test/tst_multienginepb.ipy
r2075 r2094 1 from ipython1.kernel.multienginepb import PBInteractiveMultiEngineClient 1 import time 2 3 from ipython1.kernel.multienginepb import PBInteractiveMultiEngineClient as client 4 from ipython1.kernel import magic 5 6 # Setup 7 8 c = client(('127.0.0.1',10111)) 9 c.connect() 10 c.activate() 11 c.block = True 12 assert len(c) >= 4 13 14 # Execute 15 16 c.executeAll('a=5') 17 c.execute([0,2],'b=10') 18 c.execute([1,3],'b=20') 19 c.executeAll('c=a*b') 20 %result 21 22 c.block=False 23 24 d = c.executeAll('import time') 25 d.addCallback(lambda _: c.executeAll('time.sleep(1)')) 26 d.addCallback(lambda _: c.executeAll('import math')) 27 c.blockOn(d) 28 d = c.executeAll('a = 1.0/0') 29 d.addErrback(lambda f: None) 30 c.blockOn(d) 31 32 c.block=True 33 34 %px a = 5 35 %px print a 36 %pn 0 b = 10 37 %pn 0 print b 38 39 %autopx 40 41 def f(x): 42 return x*2 43 44 45 print f(1000000000) 46 47 %autopx 48 49 # Push/Pull 50 51 c.pushAll(a=10, b=20) 52 c.pullAll('a', 'b') 53 c.pullAll('a') 54 c.push([0,1], c=range(5)) 55 c.push([2,3], c=range(5)) 56 c.pullAll('c') 57 58 c.block = False 59 60 d = c.pushAll(e=30) 61 d.addCallback(lambda _: c.pullAll('e')) 62 c.blockOn(d) 63 64 c.block = True 65 66 c['a'] = 30 67 c['a'] 68 69 for i in c.getIDs(): 70 c[i]['a'] = i 71 72 73 for i in c.getIDs(): 74 print i 2 75 3 76 4 !ipcluster -n 4 77 # getResult 5 78 79 c.getResultAll() 80 81 # reset 82 83 c.resetAll() 84 c.getResultAll() 85 86 # keys 87 88 c.resetAll() 89 c.pushAll(b=59) 90 c.keysAll() 91 92 # push/pullSerialized 93 94 a = 10 95 c.pushAll(a=a) 96 s = c.pullSerialized(0, 'a') 97 c.pushSerialized(0, a=s) 98 assert a == c.pull(0, 'a') 99 100 # Queue methods 101 102 # getIDs/verifyTargets 103 104 # scatter/gather 105 106 c.scatterAll('a', range(24)) 107 c.executeAll('b = [2.0*x for x in a]') 108 c.gatherAll('b') 109 110 # interactive methods 111 112 c.pushAll(a=10, b=30) 113 c.iexecuteAll('c=a*b') 114 c.igetResultAll() 115 116 c.iqueueStatusAll() 117 118 f = open('test.py', 'w') 119 f.write('import math\nd = 2.0*math.pi') 120 f.close() 121 c.runAll('test.py') 122 print c.ipullAll('d') 123 124 c.mapAll('lambda x: 2.0*x', range(20)) 125 126 p = c.parallelizeAll('lambda x: 2.0*x') 127 128 p(range(20)) 129 ipython/branches/saw/ipython1/test/tst_multienginepb.tpl.txt
r2075 r2094 1 %run tst_multiengine client.py1 %run tst_multienginepb.ipy ipython/branches/saw/ipython1/test/tst_multienginepb.txt
r2075 r2094 2 2 ---------------------------------------------------------------------------- 3 3 4 Begin included file tst_multiengine client.py::4 Begin included file tst_multienginepb.ipy:: 5 5 6 >>> from ipython1.kernel.multienginepb import PBInteractiveMultiEngineClient7 8 End included file tst_multiengineclient.py9 10 ----------------------------------------------------------------------------11
