1
2
3
4
5
6
7
8
9
10
11
12
13 import sys
14 import logging
15 from itertools import chain
16
17 from Globals import *
18
19 import transaction
20 from twisted.internet import defer, reactor, task
21 from OFS.ObjectManager import ObjectManager
22 from ZODB.POSException import ConflictError
23 from ZEO.Exceptions import ClientDisconnected
24 from ZEO.zrpc.error import DisconnectedError
25 from Products.ZenRelations.ToManyContRelationship import ToManyContRelationship
26 from Products.ZenModel.ZenModelRM import ZenModelRM
27 from Products.ZenUtils.ZCmdBase import ZCmdBase
28 from Products.Zuul.catalog.global_catalog import createGlobalCatalog
29
30 log = logging.getLogger("zen.Catalog")
31
32
33 HIGHER_THAN_CRITICAL = 100
34 logging.getLogger('ZODB.Connection').setLevel(HIGHER_THAN_CRITICAL)
35 logging.getLogger('ZEO.zrpc').setLevel(HIGHER_THAN_CRITICAL)
36
37 CHUNK_SIZE = 100
40 """
41 A special exception that can be yielded during a generator and watched for.
42 This lets us react to connection exceptions in the generator without killing it.
43 """
46
47
48 -def chunk(iterable, callback, reconnect_cb=lambda:None, size=1, delay=1):
49 """
50 Iterate through a generator, splitting it into chunks of size C{size},
51 calling C{callback(chunk)} on each. In case of a
52 L{DisconnectedDuringGenerator}, pause for C{delay} seconds, then call
53 C{reconnect_cb} and continue with the iteration.
54
55 This is used to walk through the database object by object without dying if
56 the database goes away or there's a C{ConflictError}.
57 """
58 gen = iter(iterable)
59
60
61
62 @defer.inlineCallbacks
63 def inner(gen=gen):
64 d = defer.Deferred()
65 d.addCallback(callback)
66 l = []
67 while True:
68 try:
69
70 n = gen.next()
71 except StopIteration:
72
73
74 if l:
75 d.callback(l)
76 yield d
77 break
78 else:
79
80 if isinstance(n, DisconnectedDuringGenerator):
81
82
83
84
85 gen = chain((n.value,), gen)
86
87
88
89
90 yield task.deferLater(reactor, delay, reconnect_cb)
91 else:
92
93 l.append(n)
94
95
96 if len(l)==size:
97 d.callback(l)
98 l = []
99 yield d
100 d = defer.Deferred()
101 d.addCallback(callback)
102
103
104 return inner()
105
108 name = 'zencatalog'
109
111 """basic options setup sub classes can add more options here"""
112 ZCmdBase.buildOptions(self)
113 self.parser.add_option("--createcatalog",
114 action="store_true",
115 default=False,
116 help="Create global catalog and populate it")
117 self.parser.add_option("--forceindex",
118 action="store_true",
119 default=False,
120 help="works with --createcatalog to re-create index, if catalog exists it will be "\
121 "dropped first")
122 self.parser.add_option("--reindex",
123 action="store_true",
124 default=False,
125 help="reindex existing catalog")
126 self.parser.add_option("--permissionsOnly",
127 action="store_true",
128 default=False,
129 help="Only works with --reindex, only update the permissions catalog")
130
131
133
134 def stop(ignored):
135 reactor.stop()
136
137 def main():
138 zport = self.dmd.getPhysicalRoot().zport
139 if self.options.createcatalog:
140 d = self._createCatalog(zport)
141 elif self.options.reindex:
142 d = self._reindex(zport)
143 d.addBoth(stop)
144
145 if not self.options.createcatalog and not self.options.reindex:
146 self.parser.error("Must use --createcatalog or --reindex")
147 reactor.callWhenRunning(main)
148 reactor.run()
149
151 globalCat = self._getCatalog(zport)
152
153 if globalCat:
154 reindex_catalog(globalCat, self.options.permissionsOnly, not self.options.daemon)
155 else:
156 log.warning('Global Catalog does not exist, try --createcatalog option')
157 return defer.succeed(None)
158
160
161
162
163 _reconnect = [False]
164
165
166 catalog = self._getCatalog(zport)
167 if self.options.forceindex and catalog:
168 zport._delObject(catalog.getId())
169 catalog = self._getCatalog(zport)
170
171 if catalog is None:
172
173 log.debug("Creating global catalog")
174 createGlobalCatalog(zport)
175 catalog = self._getCatalog(zport)
176 transaction.commit()
177 else:
178 log.info('Global catalog already exists. Run with --forceindex to drop and recreate catalog')
179 return defer.succeed(None)
180
181 def recurse(obj):
182 if _reconnect[0]:
183 log.info('Reconnected.')
184 _reconnect.pop()
185 _reconnect.append(False)
186 try:
187 if isinstance(obj, ObjectManager):
188
189 for ob in obj.objectValues():
190 for kid in recurse(ob):
191 yield kid
192 if isinstance(obj, ZenModelRM):
193 for rel in obj.getRelationships():
194 if not isinstance(rel, ToManyContRelationship):
195 continue
196 for kid in rel.objectValuesGen():
197 for gkid in recurse(kid):
198 yield gkid
199 yield obj
200 except (AttributeError, ClientDisconnected, DisconnectedError):
201
202
203
204
205
206 log.info("Connection problem during object retrieval. "
207 "Trying again in 5 seconds...")
208 _reconnect.pop()
209 _reconnect.append(True)
210 yield DisconnectedDuringGenerator(obj)
211
212 def catalog_object(ob):
213 if hasattr(ob, 'index_object'):
214 ob.index_object()
215 catalog.catalog_object(ob)
216 log.debug('Catalogued object %s' % ob.absolute_url_path())
217
218
219
220 i = [0]
221
222 def handle_chunk(c, d=None, _is_reconnect=False):
223 """
224 Return a Deferred that will call back once the chunk has been
225 catalogued. In case of a conflict or disconnect, wait 5 seconds, then
226 try again. Because this is a callback chained to a C{chunk} Deferred
227 yielded from an C{inlineCallbacks} function, the next chunk will not be
228 created until this completes successfully.
229 """
230 if d is None:
231 d = defer.Deferred()
232 self.syncdb()
233 try:
234 for ob in filter(None, c):
235 catalog_object(ob)
236 transaction.commit()
237 except ConflictError, e:
238 log.info('Conflict error during commit. Retrying...')
239 log.debug('Object in conflict: %r' % (self.app._p_jar[e.oid],))
240 reactor.callLater(0, handle_chunk, c, d)
241 except (ClientDisconnected, DisconnectedError):
242 log.info('Connection problem during commit. '
243 'Trying again in 5 seconds...')
244 reactor.callLater(5, handle_chunk, c, d, True)
245 else:
246 if _is_reconnect:
247 log.info('Reconnected.')
248 d.callback(None)
249
250 i.append(i.pop()+len(c))
251 if self.options.daemon:
252 log.info("Catalogued %s objects" % i[0])
253 else:
254 sys.stdout.write('.')
255 sys.stdout.flush()
256 return d
257
258 def reconnect():
259 """
260 If we had a connection error, the db is probably in a weird state.
261 Reset it and try again.
262 """
263 log.info("Reconnected.")
264 self.syncdb()
265
266 def set_flag(r):
267 """
268 Set a flag in the database saying we've finished indexing.
269 """
270 if self.options.daemon:
271 sys.stdout.write('\n')
272 log.debug("Marking the indexing as completed in the database")
273 self.syncdb()
274 zport._zencatalog_completed = True
275 transaction.commit()
276
277 log.info("Reindexing your system. This may take some time.")
278 d = chunk(recurse(zport), handle_chunk, reconnect, CHUNK_SIZE, 5)
279
280 return d.addCallbacks(set_flag, log.exception)
281
282
284 return getattr(zport, 'global_catalog', None)
285
286 -def reindex_catalog(globalCat, permissionsOnly=False, printProgress=True, commit=True):
287 msg = 'objects'
288 if permissionsOnly:
289 msg = 'permissions'
290 log.info('reindexing %s in catalog' % msg)
291 i = 0
292 catObj = globalCat.catalog_object
293 for brain in globalCat():
294 log.debug('indexing %s' % brain.getPath())
295 try:
296 obj = brain.getObject()
297 except Exception:
298 log.debug("Could not load object: %s" % brain.getPath())
299 globalCat.uncatalog_object(brain.getPath())
300 continue
301 if obj is not None:
302
303 kwargs = {}
304 if permissionsOnly:
305 kwargs = {'update_metadata': False,
306 'idxs': ("allowedRolesAndUsers",)}
307 elif hasattr(obj, 'index_object'):
308 obj.index_object()
309
310 catObj(obj, **kwargs)
311 log.debug('Catalogued object %s' % obj.absolute_url_path())
312 else:
313 log.debug('%s does not exists' % brain.getPath())
314 globalCat.uncatalog_object(brain.getPath())
315 i += 1
316 if not i % CHUNK_SIZE:
317 if printProgress:
318 sys.stdout.write(".")
319 sys.stdout.flush()
320 else:
321 log.info('Catalogued %s objects' % i)
322 if commit:
323 transaction.commit()
324 if printProgress:
325 sys.stdout.write('\n')
326 sys.stdout.flush()
327 if commit:
328 transaction.commit()
329
330 if __name__ == "__main__":
331 zc = ZenCatalog()
332 try:
333 zc.run()
334 except Exception, e:
335 log.exception(e)
336