1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 package org.archive.crawler.frontier;
24
25 import java.io.UnsupportedEncodingException;
26 import java.math.BigInteger;
27 import java.util.ArrayList;
28 import java.util.List;
29 import java.util.logging.Level;
30 import java.util.logging.Logger;
31 import java.util.regex.Pattern;
32
33 import org.apache.commons.collections.Closure;
34 import org.archive.crawler.datamodel.CrawlURI;
35 import org.archive.crawler.framework.FrontierMarker;
36 import org.archive.util.ArchiveUtils;
37
38 import com.sleepycat.bind.serial.StoredClassCatalog;
39 import com.sleepycat.je.Cursor;
40 import com.sleepycat.je.Database;
41 import com.sleepycat.je.DatabaseConfig;
42 import com.sleepycat.je.DatabaseEntry;
43 import com.sleepycat.je.DatabaseException;
44 import com.sleepycat.je.DatabaseNotFoundException;
45 import com.sleepycat.je.Environment;
46 import com.sleepycat.je.OperationStatus;
47 import com.sleepycat.util.RuntimeExceptionWrapper;
48
49
50 /***
51 * A BerkeleyDB-database-backed structure for holding ordered
52 * groupings of CrawlURIs. Reading the groupings from specific
53 * per-grouping (per-classKey/per-Host) starting points allows
54 * this to act as a collection of independent queues.
55 *
56 * <p>For how the bdb keys are made, see {@link #calculateInsertKey(CrawlURI)}.
57 *
58 * <p>TODO: refactor, improve naming.
59 *
60 * @author gojomo
61 */
62 public class BdbMultipleWorkQueues {
63 private static final long serialVersionUID = ArchiveUtils
64 .classnameBasedUID(BdbMultipleWorkQueues.class, 1);
65
66 private static final Logger LOGGER =
67 Logger.getLogger(BdbMultipleWorkQueues.class.getName());
68
69 /*** Database holding all pending URIs, grouped in virtual queues */
70 private Database pendingUrisDB = null;
71
72 /*** Supporting bdb serialization of CrawlURIs */
73 private RecyclingSerialBinding crawlUriBinding;
74
75 /***
76 * Create the multi queue in the given environment.
77 *
78 * @param env bdb environment to use
79 * @param classCatalog Class catalog to use.
80 * @param recycle True if we are to reuse db content if any.
81 * @throws DatabaseException
82 */
83 public BdbMultipleWorkQueues(Environment env,
84 StoredClassCatalog classCatalog, final boolean recycle)
85 throws DatabaseException {
86
87 DatabaseConfig dbConfig = new DatabaseConfig();
88 dbConfig.setAllowCreate(true);
89 if (!recycle) {
90 try {
91 env.truncateDatabase(null, "pending", false);
92 } catch (DatabaseNotFoundException e) {
93
94 }
95 }
96
97
98 dbConfig.setDeferredWrite(true);
99
100 this.pendingUrisDB = env.openDatabase(null, "pending", dbConfig);
101 crawlUriBinding =
102 new RecyclingSerialBinding(classCatalog, CrawlURI.class);
103 }
104
105 /***
106 * Delete all CrawlURIs matching the given expression.
107 *
108 * @param match
109 * @param queue
110 * @param headKey
111 * @return count of deleted items
112 * @throws DatabaseException
113 * @throws DatabaseException
114 */
115 public long deleteMatchingFromQueue(String match, String queue,
116 DatabaseEntry headKey) throws DatabaseException {
117 long deletedCount = 0;
118 Pattern pattern = Pattern.compile(match);
119 DatabaseEntry key = headKey;
120 DatabaseEntry value = new DatabaseEntry();
121 Cursor cursor = null;
122 try {
123 cursor = pendingUrisDB.openCursor(null, null);
124 OperationStatus result = cursor.getSearchKeyRange(headKey,
125 value, null);
126
127 while (result == OperationStatus.SUCCESS) {
128 if(value.getData().length>0) {
129 CrawlURI curi = (CrawlURI) crawlUriBinding
130 .entryToObject(value);
131 if (!curi.getClassKey().equals(queue)) {
132
133 break;
134 }
135 if (pattern.matcher(curi.toString()).matches()) {
136 cursor.delete();
137 deletedCount++;
138 }
139 }
140 result = cursor.getNext(key, value, null);
141 }
142 } finally {
143 if (cursor != null) {
144 cursor.close();
145 }
146 }
147
148 return deletedCount;
149 }
150
151 /***
152 * @param m marker
153 * @param maxMatches
154 * @return list of matches starting from marker position
155 * @throws DatabaseException
156 */
157 public List getFrom(FrontierMarker m, int maxMatches) throws DatabaseException {
158 int matches = 0;
159 int tries = 0;
160 ArrayList<CrawlURI> results = new ArrayList<CrawlURI>(maxMatches);
161 BdbFrontierMarker marker = (BdbFrontierMarker) m;
162
163 DatabaseEntry key = marker.getStartKey();
164 DatabaseEntry value = new DatabaseEntry();
165
166 if (key != null) {
167 Cursor cursor = null;
168 OperationStatus result = null;
169 try {
170 cursor = pendingUrisDB.openCursor(null,null);
171
172
173
174 result = cursor.getSearchKey(key, value, null);
175
176 while(matches<maxMatches && result == OperationStatus.SUCCESS) {
177 if(value.getData().length>0) {
178 CrawlURI curi = (CrawlURI) crawlUriBinding.entryToObject(value);
179 if(marker.accepts(curi)) {
180 results.add(curi);
181 matches++;
182 }
183 tries++;
184 }
185 result = cursor.getNext(key,value,null);
186 }
187 } finally {
188 if (cursor !=null) {
189 cursor.close();
190 }
191 }
192
193 if(result != OperationStatus.SUCCESS) {
194
195 marker.setStartKey(null);
196 }
197 }
198 return results;
199 }
200
201 /***
202 * Get a marker for beginning a scan over all contents
203 *
204 * @param regexpr
205 * @return a marker pointing to the first item
206 */
207 public FrontierMarker getInitialMarker(String regexpr) {
208 try {
209 return new BdbFrontierMarker(getFirstKey(), regexpr);
210 } catch (DatabaseException e) {
211 e.printStackTrace();
212 return null;
213 }
214 }
215
216 /***
217 * @return the key to the first item in the database
218 * @throws DatabaseException
219 */
220 protected DatabaseEntry getFirstKey() throws DatabaseException {
221 DatabaseEntry key = new DatabaseEntry();
222 DatabaseEntry value = new DatabaseEntry();
223 Cursor cursor = pendingUrisDB.openCursor(null,null);
224 OperationStatus status = cursor.getNext(key,value,null);
225 cursor.close();
226 if(status == OperationStatus.SUCCESS) {
227 return key;
228 }
229 return null;
230 }
231
232 /***
233 * Get the next nearest item after the given key. Relies on
234 * external discipline -- we'll look at the queues count of how many
235 * items it has -- to avoid asking for something from a
236 * range where there are no associated items --
237 * otherwise could get first item of next 'queue' by mistake.
238 *
239 * <p>TODO: hold within a queue's range
240 *
241 * @param headKey Key prefix that demarks the beginning of the range
242 * in <code>pendingUrisDB</code> we're interested in.
243 * @return CrawlURI.
244 * @throws DatabaseException
245 */
246 public CrawlURI get(DatabaseEntry headKey)
247 throws DatabaseException {
248 DatabaseEntry result = new DatabaseEntry();
249
250
251
252
253
254
255
256
257
258 OperationStatus status = getNextNearestItem(headKey, result);
259 CrawlURI retVal = null;
260 if (status != OperationStatus.SUCCESS) {
261 LOGGER.severe("See '1219854 NPE je-2.0 "
262 + "entryToObject...'. OperationStatus "
263 + " was not SUCCESS: "
264 + status
265 + ", headKey "
266 + BdbWorkQueue.getPrefixClassKey(headKey.getData()));
267 return null;
268 }
269 try {
270 retVal = (CrawlURI)crawlUriBinding.entryToObject(result);
271 } catch (RuntimeExceptionWrapper rw) {
272 LOGGER.log(
273 Level.SEVERE,
274 "expected object missing in queue " +
275 BdbWorkQueue.getPrefixClassKey(headKey.getData()),
276 rw);
277 return null;
278 }
279 retVal.setHolderKey(headKey);
280 return retVal;
281 }
282
283 protected OperationStatus getNextNearestItem(DatabaseEntry headKey,
284 DatabaseEntry result) throws DatabaseException {
285 Cursor cursor = null;
286 OperationStatus status;
287 try {
288 cursor = this.pendingUrisDB.openCursor(null, null);
289
290
291 status = cursor.getSearchKey(headKey, result, null);
292 if (status != OperationStatus.SUCCESS) {
293 LOGGER.severe("bdb queue cap missing: "
294 + status.toString() + " " + new String(headKey.getData()));
295 return status;
296 }
297 if (result.getData().length > 0) {
298 LOGGER.severe("bdb queue has nonzero size: "
299 + result.getData().length);
300 return OperationStatus.KEYEXIST;
301 }
302
303 status = cursor.getNext(headKey,result,null);
304 } finally {
305 if(cursor!=null) {
306 cursor.close();
307 }
308 }
309 return status;
310 }
311
312 /***
313 * Put the given CrawlURI in at the appropriate place.
314 *
315 * @param curi
316 * @throws DatabaseException
317 */
318 public void put(CrawlURI curi, boolean overwriteIfPresent)
319 throws DatabaseException {
320 DatabaseEntry insertKey = (DatabaseEntry)curi.getHolderKey();
321 if (insertKey == null) {
322 insertKey = calculateInsertKey(curi);
323 curi.setHolderKey(insertKey);
324 }
325 DatabaseEntry value = new DatabaseEntry();
326 crawlUriBinding.objectToEntry(curi, value);
327
328 if (LOGGER.isLoggable(Level.FINE)) {
329 tallyAverageEntrySize(curi, value);
330 }
331 OperationStatus status;
332 if(overwriteIfPresent) {
333 status = pendingUrisDB.put(null, insertKey, value);
334 } else {
335 status = pendingUrisDB.putNoOverwrite(null, insertKey, value);
336 }
337 if(status!=OperationStatus.SUCCESS) {
338 LOGGER.severe("failed; "+status+ " "+curi);
339 }
340 }
341
342 private long entryCount = 0;
343 private long entrySizeSum = 0;
344 private int largestEntry = 0;
345
346 /***
347 * Log average size of database entry.
348 * @param curi CrawlURI this entry is for.
349 * @param value Database entry value.
350 */
351 private synchronized void tallyAverageEntrySize(CrawlURI curi,
352 DatabaseEntry value) {
353 entryCount++;
354 int length = value.getData().length;
355 entrySizeSum += length;
356 int avg = (int) (entrySizeSum/entryCount);
357 if(entryCount % 1000 == 0) {
358 LOGGER.fine("Average entry size at "+entryCount+": "+avg);
359 }
360 if (length>largestEntry) {
361 largestEntry = length;
362 LOGGER.fine("Largest entry: "+length+" "+curi);
363 if(length>(2*avg)) {
364 LOGGER.fine("excessive?");
365 }
366 }
367 }
368
369 /***
370 * Calculate the 'origin' key for a virtual queue of items
371 * with the given classKey. This origin key will be a
372 * prefix of the keys for all items in the queue.
373 *
374 * @param classKey String key to derive origin byte key from
375 * @return a byte array key
376 */
377 static byte[] calculateOriginKey(String classKey) {
378 byte[] classKeyBytes = null;
379 int len = 0;
380 try {
381 classKeyBytes = classKey.getBytes("UTF-8");
382 len = classKeyBytes.length;
383 } catch (UnsupportedEncodingException e) {
384
385 e.printStackTrace();
386 }
387 byte[] keyData = new byte[len+1];
388 System.arraycopy(classKeyBytes,0,keyData,0,len);
389 keyData[len]=0;
390 return keyData;
391 }
392
393 /***
394 * Calculate the insertKey that places a CrawlURI in the
395 * desired spot. First bytes are always classKey (usu. host)
396 * based -- ensuring grouping by host -- terminated by a zero
397 * byte. Then 8 bytes of data ensuring desired ordering
398 * within that 'queue' are used. The first byte of these 8 is
399 * priority -- allowing 'immediate' and 'soon' items to
400 * sort above regular. Next 1 byte is 'cost'. Last 6 bytes
401 * are ordinal serial number, ensuring earlier-discovered
402 * URIs sort before later.
403 *
404 * NOTE: Dangers here are:
405 * (1) priorities or costs over 2^7 (signed byte comparison)
406 * (2) ordinals over 2^48
407 *
408 * Package access & static for testing purposes.
409 *
410 * @param curi
411 * @return a DatabaseEntry key for the CrawlURI
412 */
413 static DatabaseEntry calculateInsertKey(CrawlURI curi) {
414 byte[] classKeyBytes = null;
415 int len = 0;
416 try {
417 classKeyBytes = curi.getClassKey().getBytes("UTF-8");
418 len = classKeyBytes.length;
419 } catch (UnsupportedEncodingException e) {
420
421 e.printStackTrace();
422 }
423 byte[] keyData = new byte[len+9];
424 System.arraycopy(classKeyBytes,0,keyData,0,len);
425 keyData[len]=0;
426 long ordinalPlus = curi.getOrdinal() & 0x0000FFFFFFFFFFFFL;
427 ordinalPlus =
428 ((long)curi.getSchedulingDirective() << 56) | ordinalPlus;
429 ordinalPlus =
430 ((((long)curi.getHolderCost()) & 0xFFL) << 48) | ordinalPlus;
431 ArchiveUtils.longIntoByteArray(ordinalPlus, keyData, len+1);
432 return new DatabaseEntry(keyData);
433 }
434
435 /***
436 * Delete the given CrawlURI from persistent store. Requires
437 * the key under which it was stored be available.
438 *
439 * @param item
440 * @throws DatabaseException
441 */
442 public void delete(CrawlURI item) throws DatabaseException {
443 OperationStatus status;
444 status = pendingUrisDB.delete(null, (DatabaseEntry) item.getHolderKey());
445 if (status != OperationStatus.SUCCESS) {
446 LOGGER.severe("expected item not present: "
447 + item
448 + "("
449 + (new BigInteger(((DatabaseEntry) item.getHolderKey())
450 .getData())).toString(16) + ")");
451 }
452
453 }
454
455 /***
456 * Method used by BdbFrontier during checkpointing.
457 * <p>The backing bdbje database has been marked deferred write so we save
458 * on writes to disk. Means no guarantees disk will have whats in memory
459 * unless a sync is called (Calling sync on the bdbje Environment is not
460 * sufficent).
461 * <p>Package access only because only Frontiers of this package would ever
462 * need access.
463 * @see <a href="http://www.sleepycat.com/jedocs/GettingStartedGuide/DB.html">Deferred Write Databases</a>
464 */
465 void sync() {
466 if (this.pendingUrisDB == null) {
467 return;
468 }
469 try {
470 this.pendingUrisDB.sync();
471 } catch (DatabaseException e) {
472 e.printStackTrace();
473 }
474 }
475
476 /***
477 * clean up
478 *
479 */
480 public void close() {
481 try {
482 this.pendingUrisDB.close();
483 } catch (DatabaseException e) {
484 e.printStackTrace();
485 }
486 }
487
488 /***
489 * Marker for remembering a position within the BdbMultipleWorkQueues.
490 *
491 * @author gojomo
492 */
493 public class BdbFrontierMarker implements FrontierMarker {
494 DatabaseEntry startKey;
495 Pattern pattern;
496 int nextItemNumber;
497
498 /***
499 * Create a marker pointed at the given start location.
500 *
501 * @param startKey
502 * @param regexpr
503 */
504 public BdbFrontierMarker(DatabaseEntry startKey, String regexpr) {
505 this.startKey = startKey;
506 pattern = Pattern.compile(regexpr);
507 nextItemNumber = 1;
508 }
509
510 /***
511 * @param curi
512 * @return whether the marker accepts the given CrawlURI
513 */
514 public boolean accepts(CrawlURI curi) {
515 boolean retVal = pattern.matcher(curi.toString()).matches();
516 if(retVal==true) {
517 nextItemNumber++;
518 }
519 return retVal;
520 }
521
522 /***
523 * @param key position for marker
524 */
525 public void setStartKey(DatabaseEntry key) {
526 startKey = key;
527 }
528
529 /***
530 * @return startKey
531 */
532 public DatabaseEntry getStartKey() {
533 return startKey;
534 }
535
536
537
538
539 public String getMatchExpression() {
540 return pattern.pattern();
541 }
542
543
544
545
546 public long getNextItemNumber() {
547 return nextItemNumber;
548 }
549
550
551
552
553 public boolean hasNext() {
554
555 return startKey != null;
556 }
557 }
558
559 /***
560 * Add a dummy 'cap' entry at the given insertion key. Prevents
561 * 'seeks' to queue heads from holding lock on last item of
562 * 'preceding' queue. See:
563 * http://sourceforge.net/tracker/index.php?func=detail&aid=1262665&group_id=73833&atid=539102
564 *
565 * @param origin key at which to insert the cap
566 */
567 public void addCap(byte[] origin) {
568 try {
569 pendingUrisDB.put(null, new DatabaseEntry(origin),
570 new DatabaseEntry(new byte[0]));
571 } catch (DatabaseException e) {
572 throw new RuntimeException(e);
573 }
574 }
575
576
577 /***
578 * Utility method to perform action for all pending CrawlURI instances.
579 * @param c Closure action to perform
580 * @throws DatabaseException
581 */
582 protected void forAllPendingDo(Closure c) throws DatabaseException {
583 DatabaseEntry key = new DatabaseEntry();
584 DatabaseEntry value = new DatabaseEntry();
585 Cursor cursor = pendingUrisDB.openCursor(null,null);
586 while(cursor.getNext(key,value,null)==OperationStatus.SUCCESS) {
587 if(value.getData().length==0) {
588 continue;
589 }
590 CrawlURI item = (CrawlURI)crawlUriBinding.entryToObject(value);
591 c.execute(item);
592 }
593 cursor.close();
594 }
595 }