Package Products :: Package ZenUtils :: Module zencatalog
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenUtils.zencatalog

  1  ########################################################################### 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2010, Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 or (at your 
  8  # option) any later version as published by the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13  import sys 
 14  import logging 
 15  from itertools import chain 
 16   
 17  from Globals import * 
 18   
 19  import transaction 
 20  from twisted.internet import defer, reactor, task 
 21  from OFS.ObjectManager import ObjectManager 
 22  from ZODB.POSException import ConflictError 
 23  from ZEO.Exceptions import ClientDisconnected 
 24  from ZEO.zrpc.error import DisconnectedError 
 25  from Products.ZenRelations.ToManyContRelationship import ToManyContRelationship 
 26  from Products.ZenModel.ZenModelRM import ZenModelRM 
 27  from Products.ZenUtils.ZCmdBase import ZCmdBase 
 28  from Products.Zuul.catalog.global_catalog import createGlobalCatalog 
 29   
 30  log = logging.getLogger("zen.Catalog") 
 31   
 32  # Hide connection errors. We handle them all ourselves. 
 33  HIGHER_THAN_CRITICAL = 100 
 34  logging.getLogger('ZODB.Connection').setLevel(HIGHER_THAN_CRITICAL) 
 35  logging.getLogger('ZEO.zrpc').setLevel(HIGHER_THAN_CRITICAL) 
 36   
 37  CHUNK_SIZE = 100 
38 39 -class DisconnectedDuringGenerator(Exception):
40 """ 41 A special exception that can be yielded during a generator and watched for. 42 This lets us react to connection exceptions in the generator without killing it. 43 """
44 - def __init__(self, value):
45 self.value = value
46
47 48 -def chunk(iterable, callback, reconnect_cb=lambda:None, size=1, delay=1):
49 """ 50 Iterate through a generator, splitting it into chunks of size C{size}, 51 calling C{callback(chunk)} on each. In case of a 52 L{DisconnectedDuringGenerator}, pause for C{delay} seconds, then call 53 C{reconnect_cb} and continue with the iteration. 54 55 This is used to walk through the database object by object without dying if 56 the database goes away or there's a C{ConflictError}. 57 """ 58 gen = iter(iterable) 59 60 # defer.inlineCallbacks means that Deferreds yielded from the function will 61 # execute their callbacks /in order/, blocking each other. 62 @defer.inlineCallbacks 63 def inner(gen=gen): 64 d = defer.Deferred() 65 d.addCallback(callback) 66 l = [] 67 while True: 68 try: 69 # Advance the iterator 70 n = gen.next() 71 except StopIteration: 72 # Iterator's exhausted. Call back with the possibly incomplete 73 # chunk, then stop. 74 if l: 75 d.callback(l) 76 yield d 77 break 78 else: 79 # We got a value from the iterator 80 if isinstance(n, DisconnectedDuringGenerator): 81 # The special exception was yielded, meaning the generator 82 # encountered an exception we want to handle by pausing. 83 # Push the value that broke back onto the front of the 84 # iterator. 85 gen = chain((n.value,), gen) 86 # Yield a C{Deferred} that will call back to 87 # C{reconnect_cb} in C{delay} seconds. Because we're using 88 # C{inlineCallbacks}, this will block the advance of the 89 # iterator. 90 yield task.deferLater(reactor, delay, reconnect_cb) 91 else: 92 # Normal value, add it to the chunk 93 l.append(n) 94 # If the chunk is complete, call back the Deferred, yield it, and 95 # start a new chunk 96 if len(l)==size: 97 d.callback(l) 98 l = [] 99 yield d 100 d = defer.Deferred() 101 d.addCallback(callback)
102 103 # return the C{Deferred} that comes from an C{inlineCallbacks} function. 104 return inner() 105
106 107 -class ZenCatalog(ZCmdBase):
108 name = 'zencatalog' 109
110 - def buildOptions(self):
111 """basic options setup sub classes can add more options here""" 112 ZCmdBase.buildOptions(self) 113 self.parser.add_option("--createcatalog", 114 action="store_true", 115 default=False, 116 help="Create global catalog and populate it") 117 self.parser.add_option("--forceindex", 118 action="store_true", 119 default=False, 120 help="works with --createcatalog to re-create index, if catalog exists it will be "\ 121 "dropped first") 122 self.parser.add_option("--reindex", 123 action="store_true", 124 default=False, 125 help="reindex existing catalog") 126 self.parser.add_option("--permissionsOnly", 127 action="store_true", 128 default=False, 129 help="Only works with --reindex, only update the permissions catalog")
130 131
132 - def run(self):
133 134 def stop(ignored): 135 reactor.stop()
136 137 def main(): 138 zport = self.dmd.getPhysicalRoot().zport 139 if self.options.createcatalog: 140 d = self._createCatalog(zport) 141 elif self.options.reindex: 142 d = self._reindex(zport) 143 d.addBoth(stop)
144 145 if not self.options.createcatalog and not self.options.reindex: 146 self.parser.error("Must use --createcatalog or --reindex") 147 reactor.callWhenRunning(main) 148 reactor.run() 149
150 - def _reindex(self, zport):
151 globalCat = self._getCatalog(zport) 152 153 if globalCat: 154 reindex_catalog(globalCat, self.options.permissionsOnly, not self.options.daemon) 155 else: 156 log.warning('Global Catalog does not exist, try --createcatalog option') 157 return defer.succeed(None)
158
159 - def _createCatalog(self, zport):
160 161 # Whether we reconnected after a recursion failure. Because the nested 162 # func has no access to this scope, make it a mutable. 163 _reconnect = [False] 164 165 166 catalog = self._getCatalog(zport) 167 if self.options.forceindex and catalog: 168 zport._delObject(catalog.getId()) 169 catalog = self._getCatalog(zport) 170 171 if catalog is None: 172 # Create the catalog 173 log.debug("Creating global catalog") 174 createGlobalCatalog(zport) 175 catalog = self._getCatalog(zport) 176 transaction.commit() 177 else: 178 log.info('Global catalog already exists. Run with --forceindex to drop and recreate catalog') 179 return defer.succeed(None) 180 181 def recurse(obj): 182 if _reconnect[0]: 183 log.info('Reconnected.') 184 _reconnect.pop() 185 _reconnect.append(False) 186 try: 187 if isinstance(obj, ObjectManager): 188 # Bottom up, for multiple-path efficiency 189 for ob in obj.objectValues(): 190 for kid in recurse(ob): 191 yield kid 192 if isinstance(obj, ZenModelRM): 193 for rel in obj.getRelationships(): 194 if not isinstance(rel, ToManyContRelationship): 195 continue 196 for kid in rel.objectValuesGen(): 197 for gkid in recurse(kid): 198 yield gkid 199 yield obj 200 except (AttributeError, ClientDisconnected, DisconnectedError): 201 # Yield the special exception C{chunk} is watching for, so 202 # it'll pause and wait for a connection. Feed it the current 203 # object so it knows where to start from. 204 # We'll also catch AttributeErrors, which are thrown when 205 # ZenPacks get updated during the run. 206 log.info("Connection problem during object retrieval. " 207 "Trying again in 5 seconds...") 208 _reconnect.pop() 209 _reconnect.append(True) 210 yield DisconnectedDuringGenerator(obj)
211 212 def catalog_object(ob): 213 if hasattr(ob, 'index_object'): 214 ob.index_object() 215 catalog.catalog_object(ob) 216 log.debug('Catalogued object %s' % ob.absolute_url_path()) 217 218 # Count of catalogued objects. Because the nested func has no access to 219 # this scope, have to make it a mutable 220 i = [0] 221 222 def handle_chunk(c, d=None, _is_reconnect=False): 223 """ 224 Return a Deferred that will call back once the chunk has been 225 catalogued. In case of a conflict or disconnect, wait 5 seconds, then 226 try again. Because this is a callback chained to a C{chunk} Deferred 227 yielded from an C{inlineCallbacks} function, the next chunk will not be 228 created until this completes successfully. 229 """ 230 if d is None: 231 d = defer.Deferred() 232 self.syncdb() 233 try: 234 for ob in filter(None, c): 235 catalog_object(ob) 236 transaction.commit() 237 except ConflictError, e: 238 log.info('Conflict error during commit. Retrying...') 239 log.debug('Object in conflict: %r' % (self.app._p_jar[e.oid],)) 240 reactor.callLater(0, handle_chunk, c, d) 241 except (ClientDisconnected, DisconnectedError): 242 log.info('Connection problem during commit. ' 243 'Trying again in 5 seconds...') 244 reactor.callLater(5, handle_chunk, c, d, True) 245 else: 246 if _is_reconnect: 247 log.info('Reconnected.') 248 d.callback(None) 249 # Increment the count 250 i.append(i.pop()+len(c)) 251 if self.options.daemon: 252 log.info("Catalogued %s objects" % i[0]) 253 else: 254 sys.stdout.write('.') 255 sys.stdout.flush() 256 return d 257 258 def reconnect(): 259 """ 260 If we had a connection error, the db is probably in a weird state. 261 Reset it and try again. 262 """ 263 log.info("Reconnected.") 264 self.syncdb() 265 266 def set_flag(r): 267 """ 268 Set a flag in the database saying we've finished indexing. 269 """ 270 if self.options.daemon: 271 sys.stdout.write('\n') 272 log.debug("Marking the indexing as completed in the database") 273 self.syncdb() 274 zport._zencatalog_completed = True 275 transaction.commit() 276 277 log.info("Reindexing your system. This may take some time.") 278 d = chunk(recurse(zport), handle_chunk, reconnect, CHUNK_SIZE, 5) 279 280 return d.addCallbacks(set_flag, log.exception) 281 282
283 - def _getCatalog(self, zport):
284 return getattr(zport, 'global_catalog', None)
285
286 -def reindex_catalog(globalCat, permissionsOnly=False, printProgress=True, commit=True):
287 msg = 'objects' 288 if permissionsOnly: 289 msg = 'permissions' 290 log.info('reindexing %s in catalog' % msg) 291 i = 0 292 catObj = globalCat.catalog_object 293 for brain in globalCat(): 294 log.debug('indexing %s' % brain.getPath()) 295 try: 296 obj = brain.getObject() 297 except Exception: 298 log.debug("Could not load object: %s" % brain.getPath()) 299 globalCat.uncatalog_object(brain.getPath()) 300 continue 301 if obj is not None: 302 #None defaults to all inedexs 303 kwargs = {} 304 if permissionsOnly: 305 kwargs = {'update_metadata': False, 306 'idxs': ("allowedRolesAndUsers",)} 307 elif hasattr(obj, 'index_object'): 308 obj.index_object() 309 310 catObj(obj, **kwargs) 311 log.debug('Catalogued object %s' % obj.absolute_url_path()) 312 else: 313 log.debug('%s does not exists' % brain.getPath()) 314 globalCat.uncatalog_object(brain.getPath()) 315 i += 1 316 if not i % CHUNK_SIZE: 317 if printProgress: 318 sys.stdout.write(".") 319 sys.stdout.flush() 320 else: 321 log.info('Catalogued %s objects' % i) 322 if commit: 323 transaction.commit() 324 if printProgress: 325 sys.stdout.write('\n') 326 sys.stdout.flush() 327 if commit: 328 transaction.commit()
329 330 if __name__ == "__main__": 331 zc = ZenCatalog() 332 try: 333 zc.run() 334 except Exception, e: 335 log.exception(e) 336