View Javadoc

1   /* BdbMultipleWorkQueues
2    * 
3    * Created on Dec 24, 2004
4    *
5    * Copyright (C) 2004 Internet Archive.
6    * 
7    * This file is part of the Heritrix web crawler (crawler.archive.org).
8    * 
9    * Heritrix is free software; you can redistribute it and/or modify
10   * it under the terms of the GNU Lesser Public License as published by
11   * the Free Software Foundation; either version 2.1 of the License, or
12   * any later version.
13   * 
14   * Heritrix is distributed in the hope that it will be useful, 
15   * but WITHOUT ANY WARRANTY; without even the implied warranty of
16   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17   * GNU Lesser Public License for more details.
18   * 
19   * You should have received a copy of the GNU Lesser Public License
20   * along with Heritrix; if not, write to the Free Software
21   * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
22   */
23  package org.archive.crawler.frontier;
24  
25  import java.io.UnsupportedEncodingException;
26  import java.math.BigInteger;
27  import java.util.ArrayList;
28  import java.util.List;
29  import java.util.logging.Level;
30  import java.util.logging.Logger;
31  import java.util.regex.Pattern;
32  
33  import org.apache.commons.collections.Closure;
34  import org.archive.crawler.datamodel.CrawlURI;
35  import org.archive.crawler.framework.FrontierMarker;
36  import org.archive.util.ArchiveUtils;
37  
38  import com.sleepycat.bind.serial.StoredClassCatalog;
39  import com.sleepycat.je.Cursor;
40  import com.sleepycat.je.Database;
41  import com.sleepycat.je.DatabaseConfig;
42  import com.sleepycat.je.DatabaseEntry;
43  import com.sleepycat.je.DatabaseException;
44  import com.sleepycat.je.DatabaseNotFoundException;
45  import com.sleepycat.je.Environment;
46  import com.sleepycat.je.OperationStatus;
47  import com.sleepycat.util.RuntimeExceptionWrapper;
48  
49  
50  /***
51   * A BerkeleyDB-database-backed structure for holding ordered
52   * groupings of CrawlURIs. Reading the groupings from specific
53   * per-grouping (per-classKey/per-Host) starting points allows
54   * this to act as a collection of independent queues. 
55   * 
56   * <p>For how the bdb keys are made, see {@link #calculateInsertKey(CrawlURI)}.
57   * 
58   * <p>TODO: refactor, improve naming.
59   * 
60   * @author gojomo
61   */
62  public class BdbMultipleWorkQueues {
63  	private static final long serialVersionUID = ArchiveUtils
64      	.classnameBasedUID(BdbMultipleWorkQueues.class, 1);
65  	
66      private static final Logger LOGGER =
67          Logger.getLogger(BdbMultipleWorkQueues.class.getName());
68      
69      /*** Database holding all pending URIs, grouped in virtual queues */
70      private Database pendingUrisDB = null;
71      
72      /***  Supporting bdb serialization of CrawlURIs */
73      private RecyclingSerialBinding crawlUriBinding;
74  
75      /***
76       * Create the multi queue in the given environment. 
77       * 
78       * @param env bdb environment to use
79       * @param classCatalog Class catalog to use.
80       * @param recycle True if we are to reuse db content if any.
81       * @throws DatabaseException
82       */
83      public BdbMultipleWorkQueues(Environment env,
84          StoredClassCatalog classCatalog, final boolean recycle)
85      throws DatabaseException {
86          // Open the database. Create it if it does not already exist. 
87          DatabaseConfig dbConfig = new DatabaseConfig();
88          dbConfig.setAllowCreate(true);
89          if (!recycle) {
90              try {
91                  env.truncateDatabase(null, "pending", false);
92              } catch (DatabaseNotFoundException e) {
93                  // Ignored
94              }
95          }
96          // Make database deferred write: URLs that are added then removed 
97          // before a page-out is required need never cause disk IO.
98          dbConfig.setDeferredWrite(true);
99  
100         this.pendingUrisDB = env.openDatabase(null, "pending", dbConfig);
101         crawlUriBinding =
102             new RecyclingSerialBinding(classCatalog, CrawlURI.class);
103     }
104 
105     /***
106      * Delete all CrawlURIs matching the given expression.
107      * 
108      * @param match
109      * @param queue
110      * @param headKey
111      * @return count of deleted items
112      * @throws DatabaseException
113      * @throws DatabaseException
114      */
115     public long deleteMatchingFromQueue(String match, String queue,
116             DatabaseEntry headKey) throws DatabaseException {
117         long deletedCount = 0;
118         Pattern pattern = Pattern.compile(match);
119         DatabaseEntry key = headKey;
120         DatabaseEntry value = new DatabaseEntry();
121         Cursor cursor = null;
122         try {
123             cursor = pendingUrisDB.openCursor(null, null);
124             OperationStatus result = cursor.getSearchKeyRange(headKey,
125                     value, null);
126 
127             while (result == OperationStatus.SUCCESS) {
128                 if(value.getData().length>0) {
129                     CrawlURI curi = (CrawlURI) crawlUriBinding
130                             .entryToObject(value);
131                     if (!curi.getClassKey().equals(queue)) {
132                         // rolled into next queue; finished with this queue
133                         break;
134                     }
135                     if (pattern.matcher(curi.toString()).matches()) {
136                         cursor.delete();
137                         deletedCount++;
138                     }
139                 }
140                 result = cursor.getNext(key, value, null);
141             }
142         } finally {
143             if (cursor != null) {
144                 cursor.close();
145             }
146         }
147 
148         return deletedCount;
149     }
150     
151     /***
152      * @param m marker
153      * @param maxMatches
154      * @return list of matches starting from marker position
155      * @throws DatabaseException
156      */
157     public List getFrom(FrontierMarker m, int maxMatches) throws DatabaseException {
158         int matches = 0;
159         int tries = 0;
160         ArrayList<CrawlURI> results = new ArrayList<CrawlURI>(maxMatches);
161         BdbFrontierMarker marker = (BdbFrontierMarker) m;
162         
163         DatabaseEntry key = marker.getStartKey();
164         DatabaseEntry value = new DatabaseEntry();
165         
166         if (key != null) {
167             Cursor cursor = null;
168             OperationStatus result = null;
169             try {
170                 cursor = pendingUrisDB.openCursor(null,null);
171                 // NOTE: this mutates key, and thus also the marker, 
172                 // advancing the marker as a side-effect for future 
173                 // followup operations
174                 result = cursor.getSearchKey(key, value, null);
175                 
176                 while(matches<maxMatches && result == OperationStatus.SUCCESS) {
177                     if(value.getData().length>0) {
178                         CrawlURI curi = (CrawlURI) crawlUriBinding.entryToObject(value);
179                         if(marker.accepts(curi)) {
180                             results.add(curi);
181                             matches++;
182                         }
183                         tries++;
184                     }
185                     result = cursor.getNext(key,value,null);
186                 }
187             } finally {
188                 if (cursor !=null) {
189                     cursor.close();
190                 }
191             }
192             
193             if(result != OperationStatus.SUCCESS) {
194                 // end of scan
195                 marker.setStartKey(null);
196             }
197         }
198         return results;
199     }
200     
201     /***
202      * Get a marker for beginning a scan over all contents
203      * 
204      * @param regexpr
205      * @return a marker pointing to the first item
206      */
207     public FrontierMarker getInitialMarker(String regexpr) {
208         try {
209             return new BdbFrontierMarker(getFirstKey(), regexpr);
210         } catch (DatabaseException e) {
211             e.printStackTrace();
212             return null; 
213         }
214     }
215     
216     /***
217      * @return the key to the first item in the database
218      * @throws DatabaseException
219      */
220     protected DatabaseEntry getFirstKey() throws DatabaseException {
221         DatabaseEntry key = new DatabaseEntry();
222         DatabaseEntry value = new DatabaseEntry();
223         Cursor cursor = pendingUrisDB.openCursor(null,null);
224         OperationStatus status = cursor.getNext(key,value,null);
225         cursor.close();
226         if(status == OperationStatus.SUCCESS) {
227             return key;
228         }
229         return null;
230     }
231     
232     /***
233      * Get the next nearest item after the given key. Relies on 
234      * external discipline -- we'll look at the queues count of how many
235      * items it has -- to avoid asking for something from a
236      * range where there are no associated items --
237      * otherwise could get first item of next 'queue' by mistake. 
238      * 
239      * <p>TODO: hold within a queue's range
240      * 
241      * @param headKey Key prefix that demarks the beginning of the range
242      * in <code>pendingUrisDB</code> we're interested in.
243      * @return CrawlURI.
244      * @throws DatabaseException
245      */
246     public CrawlURI get(DatabaseEntry headKey)
247     throws DatabaseException {
248         DatabaseEntry result = new DatabaseEntry();
249         
250         // From Linda Lee of sleepycat:
251         // "You want to check the status returned from Cursor.getSearchKeyRange
252         // to make sure that you have OperationStatus.SUCCESS. In that case,
253         // you have found a valid data record, and result.getData()
254         // (called by internally by the binding code, in this case) will be
255         // non-null. The other possible status return is
256         // OperationStatus.NOTFOUND, in which case no data record matched
257         // the criteria. "
258         OperationStatus status = getNextNearestItem(headKey, result);
259         CrawlURI retVal = null;
260         if (status != OperationStatus.SUCCESS) {
261             LOGGER.severe("See '1219854 NPE je-2.0 "
262                     + "entryToObject...'. OperationStatus "
263                     + " was not SUCCESS: "
264                     + status
265                     + ", headKey "
266                     + BdbWorkQueue.getPrefixClassKey(headKey.getData()));
267             return null;
268         }
269         try {
270             retVal = (CrawlURI)crawlUriBinding.entryToObject(result);
271         } catch (RuntimeExceptionWrapper rw) {
272             LOGGER.log(
273                 Level.SEVERE,
274                 "expected object missing in queue " +
275                 BdbWorkQueue.getPrefixClassKey(headKey.getData()),
276                 rw);
277             return null; 
278         }
279         retVal.setHolderKey(headKey);
280         return retVal;
281     }
282     
283     protected OperationStatus getNextNearestItem(DatabaseEntry headKey,
284             DatabaseEntry result) throws DatabaseException {
285         Cursor cursor = null;
286         OperationStatus status;
287         try {
288             cursor = this.pendingUrisDB.openCursor(null, null);
289             // get cap; headKey at this point should always point to 
290             // a queue-beginning cap entry (zero-length value)
291             status = cursor.getSearchKey(headKey, result, null);
292             if (status != OperationStatus.SUCCESS) {
293                 LOGGER.severe("bdb queue cap missing: " 
294                         + status.toString() + " "  + new String(headKey.getData()));
295                 return status;
296             }
297             if (result.getData().length > 0) {
298                 LOGGER.severe("bdb queue has nonzero size: " 
299                         + result.getData().length);
300                 return OperationStatus.KEYEXIST;
301             }
302             // get next item (real first item of queue)
303             status = cursor.getNext(headKey,result,null);
304         } finally { 
305             if(cursor!=null) {
306                 cursor.close();
307             }
308         }
309         return status;
310     }
311     
312     /***
313      * Put the given CrawlURI in at the appropriate place. 
314      * 
315      * @param curi
316      * @throws DatabaseException
317      */
318     public void put(CrawlURI curi, boolean overwriteIfPresent) 
319     throws DatabaseException {
320         DatabaseEntry insertKey = (DatabaseEntry)curi.getHolderKey();
321         if (insertKey == null) {
322             insertKey = calculateInsertKey(curi);
323             curi.setHolderKey(insertKey);
324         }
325         DatabaseEntry value = new DatabaseEntry();
326         crawlUriBinding.objectToEntry(curi, value);
327         // Output tally on avg. size if level is FINE or greater.
328         if (LOGGER.isLoggable(Level.FINE)) {
329             tallyAverageEntrySize(curi, value);
330         }
331         OperationStatus status; 
332         if(overwriteIfPresent) {
333             status = pendingUrisDB.put(null, insertKey, value);
334         } else {
335             status = pendingUrisDB.putNoOverwrite(null, insertKey, value);
336         }
337         if(status!=OperationStatus.SUCCESS) {
338             LOGGER.severe("failed; "+status+ " "+curi);
339         }
340     }
341     
342     private long entryCount = 0;
343     private long entrySizeSum = 0;
344     private int largestEntry = 0;
345     
346     /***
347      * Log average size of database entry.
348      * @param curi CrawlURI this entry is for.
349      * @param value Database entry value.
350      */
351     private synchronized void tallyAverageEntrySize(CrawlURI curi,
352             DatabaseEntry value) {
353         entryCount++;
354         int length = value.getData().length;
355         entrySizeSum += length;
356         int avg = (int) (entrySizeSum/entryCount);
357         if(entryCount % 1000 == 0) {
358             LOGGER.fine("Average entry size at "+entryCount+": "+avg);
359         }
360         if (length>largestEntry) {
361             largestEntry = length; 
362             LOGGER.fine("Largest entry: "+length+" "+curi);
363             if(length>(2*avg)) {
364                 LOGGER.fine("excessive?");
365             }
366         }
367     }
368 
369     /***
370      * Calculate the 'origin' key for a virtual queue of items
371      * with the given classKey. This origin key will be a 
372      * prefix of the keys for all items in the queue. 
373      * 
374      * @param classKey String key to derive origin byte key from 
375      * @return a byte array key 
376      */
377     static byte[] calculateOriginKey(String classKey) {
378         byte[] classKeyBytes = null;
379         int len = 0;
380         try {
381             classKeyBytes = classKey.getBytes("UTF-8");
382             len = classKeyBytes.length;
383         } catch (UnsupportedEncodingException e) {
384             // should be impossible; all JVMs must support UTF-8
385             e.printStackTrace();
386         }
387         byte[] keyData = new byte[len+1];
388         System.arraycopy(classKeyBytes,0,keyData,0,len);
389         keyData[len]=0;
390         return keyData;
391     }
392     
393     /***
394      * Calculate the insertKey that places a CrawlURI in the
395      * desired spot. First bytes are always classKey (usu. host)
396      * based -- ensuring grouping by host -- terminated by a zero
397      * byte. Then 8 bytes of data ensuring desired ordering 
398      * within that 'queue' are used. The first byte of these 8 is
399      * priority -- allowing 'immediate' and 'soon' items to 
400      * sort above regular. Next 1 byte is 'cost'. Last 6 bytes 
401      * are ordinal serial number, ensuring earlier-discovered 
402      * URIs sort before later. 
403      * 
404      * NOTE: Dangers here are:
405      * (1) priorities or costs over 2^7 (signed byte comparison)
406      * (2) ordinals over 2^48
407      * 
408      * Package access & static for testing purposes. 
409      * 
410      * @param curi
411      * @return a DatabaseEntry key for the CrawlURI
412      */
413     static DatabaseEntry calculateInsertKey(CrawlURI curi) {
414         byte[] classKeyBytes = null;
415         int len = 0;
416         try {
417             classKeyBytes = curi.getClassKey().getBytes("UTF-8");
418             len = classKeyBytes.length;
419         } catch (UnsupportedEncodingException e) {
420             // should be impossible; all JVMs must support UTF-8
421             e.printStackTrace();
422         }
423         byte[] keyData = new byte[len+9];
424         System.arraycopy(classKeyBytes,0,keyData,0,len);
425         keyData[len]=0;
426         long ordinalPlus = curi.getOrdinal() & 0x0000FFFFFFFFFFFFL;
427         ordinalPlus = 
428         	((long)curi.getSchedulingDirective() << 56) | ordinalPlus;
429         ordinalPlus = 
430         	((((long)curi.getHolderCost()) & 0xFFL) << 48) | ordinalPlus;
431         ArchiveUtils.longIntoByteArray(ordinalPlus, keyData, len+1);
432         return new DatabaseEntry(keyData);
433     }
434     
435     /***
436      * Delete the given CrawlURI from persistent store. Requires
437      * the key under which it was stored be available. 
438      * 
439      * @param item
440      * @throws DatabaseException
441      */
442     public void delete(CrawlURI item) throws DatabaseException {
443         OperationStatus status;
444         status = pendingUrisDB.delete(null, (DatabaseEntry) item.getHolderKey());
445         if (status != OperationStatus.SUCCESS) {
446             LOGGER.severe("expected item not present: "
447                     + item
448                     + "("
449                     + (new BigInteger(((DatabaseEntry) item.getHolderKey())
450                             .getData())).toString(16) + ")");
451         }
452 
453     }
454     
455     /***
456      * Method used by BdbFrontier during checkpointing.
457      * <p>The backing bdbje database has been marked deferred write so we save
458      * on writes to disk.  Means no guarantees disk will have whats in memory
459      * unless a sync is called (Calling sync on the bdbje Environment is not
460      * sufficent).
461      * <p>Package access only because only Frontiers of this package would ever
462      * need access.
463      * @see <a href="http://www.sleepycat.com/jedocs/GettingStartedGuide/DB.html">Deferred Write Databases</a>
464      */
465     void sync() {
466     	if (this.pendingUrisDB == null) {
467     		return;
468     	}
469         try {
470             this.pendingUrisDB.sync();
471         } catch (DatabaseException e) {
472             e.printStackTrace();
473         }
474     }
475     
476     /***
477      * clean up 
478      *
479      */
480     public void close() {
481         try {
482             this.pendingUrisDB.close();
483         } catch (DatabaseException e) {
484             e.printStackTrace();
485         }
486     }
487     
488     /***
489      * Marker for remembering a position within the BdbMultipleWorkQueues.
490      * 
491      * @author gojomo
492      */
493     public class BdbFrontierMarker implements FrontierMarker {
494         DatabaseEntry startKey;
495         Pattern pattern; 
496         int nextItemNumber;
497         
498         /***
499          * Create a marker pointed at the given start location.
500          * 
501          * @param startKey
502          * @param regexpr
503          */
504         public BdbFrontierMarker(DatabaseEntry startKey, String regexpr) {
505             this.startKey = startKey;
506             pattern = Pattern.compile(regexpr);
507             nextItemNumber = 1;
508         }
509         
510         /***
511          * @param curi
512          * @return whether the marker accepts the given CrawlURI
513          */
514         public boolean accepts(CrawlURI curi) {
515             boolean retVal = pattern.matcher(curi.toString()).matches();
516             if(retVal==true) {
517                 nextItemNumber++;
518             }
519             return retVal;
520         }
521         
522         /***
523          * @param key position for marker
524          */
525         public void setStartKey(DatabaseEntry key) {
526             startKey = key;
527         }
528         
529         /***
530          * @return startKey
531          */
532         public DatabaseEntry getStartKey() {
533             return startKey;
534         }
535         
536         /* (non-Javadoc)
537          * @see org.archive.crawler.framework.FrontierMarker#getMatchExpression()
538          */
539         public String getMatchExpression() {
540             return pattern.pattern();
541         }
542         
543         /* (non-Javadoc)
544          * @see org.archive.crawler.framework.FrontierMarker#getNextItemNumber()
545          */
546         public long getNextItemNumber() {
547             return nextItemNumber;
548         }
549         
550         /* (non-Javadoc)
551          * @see org.archive.crawler.framework.FrontierMarker#hasNext()
552          */
553         public boolean hasNext() {
554             // as long as any startKey is stated, consider as having next
555             return startKey != null;
556         }
557     }
558 
559     /***
560      * Add a dummy 'cap' entry at the given insertion key. Prevents
561      * 'seeks' to queue heads from holding lock on last item of 
562      * 'preceding' queue. See:
563      * http://sourceforge.net/tracker/index.php?func=detail&aid=1262665&group_id=73833&atid=539102
564      * 
565      * @param origin key at which to insert the cap
566      */
567     public void addCap(byte[] origin) {
568         try {
569             pendingUrisDB.put(null, new DatabaseEntry(origin),
570                     new DatabaseEntry(new byte[0]));
571         } catch (DatabaseException e) {
572             throw new RuntimeException(e);
573         }
574     }
575     
576     
577     /***
578      * Utility method to perform action for all pending CrawlURI instances.
579      * @param c Closure action to perform
580      * @throws DatabaseException
581      */
582     protected void forAllPendingDo(Closure c) throws DatabaseException {
583         DatabaseEntry key = new DatabaseEntry();
584         DatabaseEntry value = new DatabaseEntry();
585         Cursor cursor = pendingUrisDB.openCursor(null,null);
586         while(cursor.getNext(key,value,null)==OperationStatus.SUCCESS) {
587             if(value.getData().length==0) {
588                 continue;
589             }
590             CrawlURI item = (CrawlURI)crawlUriBinding.entryToObject(value);
591             c.execute(item);
592         }
593         cursor.close();
594     }
595 }