View Javadoc

1   /* AdaptiveRevisitHostQueue
2   *
3   * Created on Sep 13, 2004
4   *
5   * Copyright (C) 2004 Kristinn Sigur?sson.
6   *
7   * This file is part of the Heritrix web crawler (crawler.archive.org).
8   *
9   * Heritrix is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU Lesser Public License as published by
11  * the Free Software Foundation; either version 2.1 of the License, or
12  * any later version.
13  *
14  * Heritrix is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU Lesser Public License for more details.
18  *
19  * You should have received a copy of the GNU Lesser Public License
20  * along with Heritrix; if not, write to the Free Software
21  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
22  */
23  package org.archive.crawler.frontier;
24  
25  import java.io.IOException;
26  import java.util.logging.Level;
27  import java.util.logging.Logger;
28  
29  import org.archive.crawler.datamodel.CandidateURI;
30  import org.archive.crawler.datamodel.CrawlSubstats;
31  import org.archive.crawler.datamodel.CrawlURI;
32  import org.archive.crawler.framework.Frontier.FrontierGroup;
33  import org.archive.util.ArchiveUtils;
34  
35  import com.sleepycat.bind.EntryBinding;
36  import com.sleepycat.bind.serial.ClassCatalog;
37  import com.sleepycat.bind.serial.SerialBinding;
38  import com.sleepycat.bind.serial.StoredClassCatalog;
39  import com.sleepycat.bind.serial.TupleSerialKeyCreator;
40  import com.sleepycat.bind.tuple.StringBinding;
41  import com.sleepycat.bind.tuple.TupleBinding;
42  import com.sleepycat.bind.tuple.TupleInput;
43  import com.sleepycat.bind.tuple.TupleOutput;
44  import com.sleepycat.je.Cursor;
45  import com.sleepycat.je.Database;
46  import com.sleepycat.je.DatabaseConfig;
47  import com.sleepycat.je.DatabaseEntry;
48  import com.sleepycat.je.DatabaseException;
49  import com.sleepycat.je.Environment;
50  import com.sleepycat.je.LockMode;
51  import com.sleepycat.je.OperationStatus;
52  import com.sleepycat.je.SecondaryConfig;
53  import com.sleepycat.je.SecondaryDatabase;
54  
55  /***
56   * A priority based queue of CrawlURIs. Each queue should represent
57   * one host (although this is not enforced in this class). Items are ordered
58   * by the scheduling directive and time of next processing (in that order) 
59   * and also indexed by the URI.
60   * <p>
61   * The HQ does no calculations on the 'time of next processing.' It always 
62   * relies on values already set on the CrawlURI.
63   * <p>
64   * Note: Class is not 'thread safe.' In multi threaded environment the caller
65   * must ensure that two threads do not make overlapping calls.
66   * <p>
67   * Any BDB DatabaseException will be converted to an IOException by public 
68   * methods. This includes preserving the original stacktrace, in favor of the
69   * one created for the IOException, so that the true source of the exception
70   * is not lost. 
71   *
72   * @author Kristinn Sigurdsson
73   */
74  public class AdaptiveRevisitHostQueue
75  implements AdaptiveRevisitAttributeConstants, FrontierGroup {
76      
77      // TODO: Need to be able to remove URIs, both by name and reg.expr.
78      
79      // Constants declerations
80      /*** HQ contains no queued CrawlURIs elements. This state only occurs after 
81       *  queue creation before the first add. After the first item is added the 
82       *  state can never become empty again.  */
83      public static final int HQSTATE_EMPTY = 0;
84      /*** HQ has a CrawlURI ready for processing */
85      public static final int HQSTATE_READY = 1;
86      /*** HQ has maximum number of CrawlURI currently being processed. This number
87       *  is either equal to the 'valence' (maximum number of simultanious 
88       *  connections to a host) or (if smaller) the total number of CrawlURIs 
89       *  in the HQ. */
90      public static final int HQSTATE_BUSY = 2; 
91      /*** HQ is in a suspended state until it can be woken back up */
92      public static final int HQSTATE_SNOOZED = 3;
93      
94      // Internal class variables
95      /*** Name of the host that this AdaptiveRevisitHostQueue represents */
96      final String hostName;
97      /*** Last known state of HQ -- ALL methods should use getState() to read
98       *  this value, never read it directly. */
99      int state;
100     /*** Time (in milliseconds) when the HQ will next be ready to issue a URI
101      *  for processing. When setting this value, methods should use the 
102      *  setter method {@link #setNextReadyTime(long) setNextReadyTime()}
103      */
104     long nextReadyTime;
105     /*** Time (in milliseconds) when each URI 'slot' becomes available again.<p>
106      *  Any positive value larger then the current time signifies a taken slot 
107      *  where the URI has completed processing but the politness wait has not
108      *  ended. <p>
109      *  A zero or positive value smaller then the current time in milliseconds
110      *  signifies an empty slot.<p> 
111      *  Any negative value signifies a slot for a URI that is being processed.
112      *  <p>
113      *  Methods should never write directly to this, rather use the 
114      *  {@link #updateWakeUpTimeSlot(long) updateWakeUpTimeSlot()} and
115      *  {@link #useWakeUpTimeSlot() useWakeUpTimeSlot()} methods as needed.
116      */
117     long[] wakeUpTime;
118     /*** Number of simultanious connections permitted to this host. I.e. this 
119      *  many URIs can be issued before state of HQ becomes busy until one of 
120      *  them is returned via the update method. */
121     int valence;
122     /***
123      * Size of queue. That is, the number of CrawlURIs that have been added to
124      * it, including any that are currently being processed.
125      */
126     long size;
127     /*** Number of URIs belonging to this queue that are being processed at the
128      *  moment. This number will always be in the range of 0 - valence 
129      */
130     long inProcessing;
131     /*** The AdaptiveRevisitHostQueueList that contains this class. This
132      * reference is 
133      *  maintained to inform the owning class of changes to the sort order
134      *  value. Value may be null, in which case no notices are made.*/
135     private AdaptiveRevisitQueueList owner;
136     /*** Logger */
137     private static final Logger logger = 
138         Logger.getLogger(AdaptiveRevisitHostQueue.class.getName());
139     
140     protected CrawlSubstats substats = new CrawlSubstats(); 
141     
142     // Berkeley DB - All class member variables related to BDB JE
143     // Databases
144     /*** Database containing the URI priority queue, indexed by the the 
145       * URI string. */
146     protected Database primaryUriDB;
147     /*** Secondary index into {@link #primaryUriDB the primary DB}, URIs indexed
148      *  by the time when they can next be processed again. */
149     protected SecondaryDatabase secondaryUriDB;
150     /*** A database containing those URIs that are currently being processed. */
151     protected Database processingUriDB;
152     // Serialization support
153     /*** For BDB serialization of objects */
154     protected StoredClassCatalog classCatalog;
155     /*** A binding for the serialization of the primary key (URI string) */
156     protected EntryBinding primaryKeyBinding;
157     /*** A binding for the CrawlURIARWrapper object */
158     protected EntryBinding crawlURIBinding;
159     // Cursors into databases
160 
161     
162     
163     /***
164      * Constructor
165      * 
166      * @param hostName Name of the host this queue represents. This name must
167      *                 be unique for all HQs in the same Environment.
168      * @param env Berkeley DB Environment. All BDB databases created will use 
169      *            it.
170      * @param catalog Db for bdb class serialization.
171      * @param valence The total number of simultanous URIs that the HQ can issue
172      *                for processing. Once this many URIs have been issued for
173      *                processing, the HQ will go into {@link #HQSTATE_BUSY busy}
174      *                state until at least one of the URI is 
175      *                {@link #update(CrawlURI, boolean, long) updated}.
176      *                Value should be larger then zero. Zero and negative values
177      *                will be treated same as 1.
178      * 
179      * @throws IOException if an error occurs opening/creating the 
180      *         database
181      */
182     public AdaptiveRevisitHostQueue(String hostName, Environment env,
183             StoredClassCatalog catalog, int valence)
184     throws IOException {
185         try{
186             if(valence < 1) {
187                 this.valence = 1;
188             } else {
189                 this.valence = valence;
190             }
191             wakeUpTime = new long[valence];
192             for(int i = 0 ; i < valence ; i++){
193                 wakeUpTime[i]=0; // 0 means open slot.
194             }
195             
196             inProcessing = 0;
197             
198             this.hostName = hostName;
199             
200             state = HQSTATE_EMPTY; //HQ is initially empty.
201             nextReadyTime = Long.MAX_VALUE; //Empty and busy HQ get this value.
202             
203             // Set up the primary URI database, it is indexed by URI names 
204             DatabaseConfig dbConfig = new DatabaseConfig();
205             dbConfig.setTransactional(false); 
206             dbConfig.setAllowCreate(true);
207             primaryUriDB = env.openDatabase(null, hostName, dbConfig);
208     
209             this.classCatalog = catalog;
210             
211             // Set up a DB for storing URIs being processed
212             DatabaseConfig dbConfig2 = new DatabaseConfig();
213             dbConfig2.setTransactional(false); 
214             dbConfig2.setAllowCreate(true);
215             processingUriDB = env.openDatabase(null, 
216                     hostName + "/processing", dbConfig2);
217             
218             // Create a primitive binding for the primary key (URI string) 
219             primaryKeyBinding = TupleBinding.getPrimitiveBinding(String.class);
220             // Create a serial binding for the CrawlURI object 
221             crawlURIBinding = new SerialBinding(classCatalog, CrawlURI.class);
222     
223             // Open a secondary database to allow accessing the primary
224             // database by the secondary key value.
225             SecondaryConfig secConfig = new SecondaryConfig();
226             secConfig.setAllowCreate(true);
227             secConfig.setSortedDuplicates(true);
228             secConfig.setKeyCreator(
229                     new OrderOfProcessingKeyCreator(classCatalog,CrawlURI.class));
230             secondaryUriDB = env.openSecondaryDatabase(null, 
231                 hostName+"/timeOfProcessing", primaryUriDB, secConfig);
232             
233             // Check if we are opening an existing DB...
234             size = countCrawlURIs();
235             if (size > 0) {
236                 // If size > 0 then we just opened an existing DB.
237                 // Set nextReadyTime;
238                 nextReadyTime = peek().getLong(
239                         A_TIME_OF_NEXT_PROCESSING);
240                 // Move any items in processingUriDB into the primariUriDB, ensure
241                 // that they wind up on top!
242                 flushProcessingURIs();
243                 state = HQSTATE_READY;
244             } 
245         } catch (DatabaseException e) {
246             // Blanket catch all DBExceptions and convert to IOExceptions.
247             IOException e2 = new IOException(e.getMessage());
248             e2.setStackTrace(e.getStackTrace());
249             throw e2; 
250         }
251     }
252     
253     /***
254      * Returns the HQ's name
255      * @return the HQ's name
256      */
257     public String getHostName() {
258         return hostName;
259     }
260     
261     /***
262      * Add a CrawlURI to this host queue.
263      * <p>
264      * Calls can optionally chose to have the time of next processing value 
265      * override existing values for the URI if the existing values are 'later'
266      * then the new ones. 
267      * 
268      * @param curi The CrawlURI to add.
269      * @param overrideSetTimeOnDups If true then the time of next processing for
270      *                           the supplied URI will override the any
271      *                           existing time for it already stored in the HQ.
272      *                           If false, then no changes will be made to any
273      *                           existing values of the URI. Note: Will never
274      *                           override with a later time.
275      * @throws IOException When an error occurs accessing the database
276      */
277     public void add(CrawlURI curi, boolean overrideSetTimeOnDups) 
278             throws IOException{
279         if(logger.isLoggable(Level.FINER)){
280             logger.finer("Adding " + curi.toString());
281         }
282         try{
283             if(inProcessing(curi.toString())){
284                 // If it is currently being processed, then it is already been 
285                 // added and we sure as heck can't fetch it any sooner!
286                 return;
287             }
288             
289             OperationStatus opStatus = strictAdd(curi,false);
290             
291             long curiProcessingTime = curi.getLong(
292                     A_TIME_OF_NEXT_PROCESSING);
293     
294             if (opStatus == OperationStatus.KEYEXIST){ 
295                 // Override an existing URI
296                 // We need to extract the old CrawlURI (it contains vital
297                 // info on past crawls), check its scheduling directive
298                 // and (possibly) its time of next fetch and update if it
299                 // will promote the URI to an earlier processing time.
300                 boolean update = false;
301                 CrawlURI curiExisting = getCrawlURI(curi.toString());
302                 long oldCuriProcessingTime = curiExisting.getLong(
303                         A_TIME_OF_NEXT_PROCESSING);
304                 if(curi.getSchedulingDirective() < 
305                         curiExisting.getSchedulingDirective()){
306                     // New scheduling directive is of higher importance,
307                     // update to promote URI.
308                     curiExisting.setSchedulingDirective(
309                             curi.getSchedulingDirective());
310                     update = true;
311                 }
312                 if( (curiProcessingTime < oldCuriProcessingTime)
313                         && (overrideSetTimeOnDups || update)){
314                     // We update the processing time if it is earlier then 
315                     // the original and either overrideSetTimeOnDups was set
316                     // or update is true, meaning a higher priority scheduling
317                     // directive for this URI.
318                     curiExisting.putLong(
319                             A_TIME_OF_NEXT_PROCESSING,
320                             curiProcessingTime);
321                     update = true;
322                 }
323                 if(update){
324                     opStatus = strictAdd(curiExisting,true); //Override
325                 } else {
326                     return;
327                 }
328             } else if(opStatus == OperationStatus.SUCCESS) {
329                 // Just inserted a brand new CrawlURI into the queue.
330                 size++;
331             }
332     
333             // Finally, check if insert (fresh add or override) into DB was 
334             // successful and if so check if we need to update nextReadyTime.
335             if(opStatus == OperationStatus.SUCCESS){
336                 if (curiProcessingTime < nextReadyTime){
337                     // Update nextReadyTime to reflect new value.
338                     setNextReadyTime(curiProcessingTime);
339                 }
340                 if(state == HQSTATE_EMPTY){
341                     // Definately no longer empty.
342                     state = HQSTATE_READY;
343                 }
344             } else {
345                 // Something went wrong. Throw an exception.
346                 throw new IOException("Error on add into database for " +
347                         "CrawlURI " + curi.toString() + ". " + 
348                         opStatus.toString());
349             }
350         } catch (DatabaseException e) {
351             // Blanket catch all DBExceptions and convert to IOExceptions.
352             IOException e2 = new IOException(e.getMessage());
353             e2.setStackTrace(e.getStackTrace()); //preserve original stacktrace
354             throw e2; 
355         }
356         reorder(); // May need a reorder.
357     }
358     
359     /***
360      * An internal method for adding URIs to the queue. 
361      * 
362      * @param curi The CrawlURI to add
363      * @param overrideDuplicates If true then any existing CrawlURI in the DB
364      *                           will be overwritten. If false insert into the
365      *                           queue is only performed if the key doesn't 
366      *                           already exist.
367      * @return The OperationStatus object returned by the put method.
368      * 
369      * @throws DatabaseException
370      */
371     protected OperationStatus strictAdd(CrawlURI curi,
372         boolean overrideDuplicates)
373     throws DatabaseException{
374         DatabaseEntry keyEntry = new DatabaseEntry();
375         DatabaseEntry dataEntry = new DatabaseEntry();
376         primaryKeyBinding.objectToEntry(curi.toString(), keyEntry);
377         crawlURIBinding.objectToEntry(curi, dataEntry);
378         OperationStatus opStatus = null;
379         if(overrideDuplicates){
380             opStatus = primaryUriDB.put(null,keyEntry,dataEntry);
381         } else {
382             opStatus = primaryUriDB.putNoOverwrite(null,keyEntry,dataEntry);
383         }
384         
385         return opStatus;
386     }
387     
388     /***
389      * Flush any CrawlURIs in the processingUriDB into the primaryUriDB. URIs
390      * flushed will have their 'time of next fetch' maintained and the 
391      * nextReadyTime will be updated if needed.
392      * <p>
393      * No change is made to the list of available slots. 
394      * 
395      * @throws DatabaseException if one occurs while flushing
396      */
397     protected void flushProcessingURIs() throws DatabaseException, IOException {
398         Cursor processingCursor = processingUriDB.openCursor(null,null);
399         DatabaseEntry keyEntry = new DatabaseEntry();
400         DatabaseEntry dataEntry = new DatabaseEntry();
401         
402         while(true){
403             OperationStatus opStatus = processingCursor.getFirst(
404                     keyEntry, dataEntry, LockMode.DEFAULT);
405             
406             if(opStatus == OperationStatus.SUCCESS){
407                 // Got one!
408                 CrawlURI curi = 
409                     (CrawlURI) crawlURIBinding.entryToObject(dataEntry);
410                 // Delete it from processingUriDB
411                 deleteInProcessing(curi.toString());
412                 // Add to processingUriDB;
413                 strictAdd(curi,false); // Ignore any duplicates. Go with the
414                                        // ones already in the queue.
415                 // Update nextReadyTime if needed.
416                 long curiNextReadyTime = curi.getLong(
417                         A_TIME_OF_NEXT_PROCESSING);
418                 if(curiNextReadyTime<nextReadyTime){
419                     setNextReadyTime(curiNextReadyTime);
420                 }
421             } else {
422                 // No more entries in processingUriDB
423                 processingCursor.close();
424                 return;
425             }
426         } 
427     }
428     
429     /***
430      * Count all entries in both primaryUriDB and processingUriDB.
431      * <p>
432      * This method is needed since BDB does not provide a simple way of counting
433      * entries.
434      * <p>
435      * Note: This is an expensive operation, requires a loop through the entire
436      * queue!
437      * @return the number of distinct CrawlURIs in the HQ.
438      * @throws DatabaseException
439      */
440     protected long countCrawlURIs() throws DatabaseException{
441         // TODO: Instead of all this, the value should be simply read from the
442         //       database.
443         long count = 0;
444         
445         DatabaseEntry keyEntry = new DatabaseEntry();
446         DatabaseEntry dataEntry = new DatabaseEntry();        
447         
448         // Count URIs in the queue
449         Cursor primaryCursor = primaryUriDB.openCursor(null,null);
450         OperationStatus opStatus = primaryCursor.getFirst(keyEntry,
451                                                             dataEntry,
452                                                             LockMode.DEFAULT);
453         while(opStatus == OperationStatus.SUCCESS){
454             count++;
455             opStatus = primaryCursor.getNext(keyEntry,
456                                              dataEntry,
457                                              LockMode.DEFAULT);
458         }
459         
460         primaryCursor.close();
461 
462         // Now count URIs in the processingUriDB
463         Cursor processingCursor = processingUriDB.openCursor(null,null);
464         opStatus = processingCursor.getFirst(keyEntry,
465                                              dataEntry,
466                                              LockMode.DEFAULT);
467         while(opStatus == OperationStatus.SUCCESS){
468             count++;
469             opStatus = processingCursor.getNext(keyEntry,
470                                                 dataEntry,
471                                                 LockMode.DEFAULT);
472         }
473         
474         processingCursor.close();
475         return count;
476     }
477     
478     /***
479      * Returns true if this HQ has a CrawlURI matching the uri string currently
480      * being processed. False otherwise.
481      * 
482      * @param uri Uri to check
483      * @return true if this HQ has a CrawlURI matching the uri string currently
484      * being processed. False otherwise.
485      * 
486      * @throws DatabaseException
487      */
488     protected boolean inProcessing(String uri) throws DatabaseException{
489         DatabaseEntry keyEntry = new DatabaseEntry();
490         DatabaseEntry dataEntry = new DatabaseEntry();
491                 
492         StringBinding.stringToEntry(uri,keyEntry);
493         
494         OperationStatus opStatus = processingUriDB.get(null,
495                                                        keyEntry,
496                                                        dataEntry,
497                                                        LockMode.DEFAULT);
498         
499         if (opStatus == OperationStatus.SUCCESS){
500             return true;
501         }
502         
503         return false; //Not found
504     }
505     
506     /***
507      * Removes a URI from the list of URIs belonging to this HQ and are 
508      * currently being processed.
509      * <p>
510      * Returns true if successful, false if the URI was not found.
511      * 
512      * @param uri The URI string of the CrawlURI to delete.
513      * 
514      * @throws DatabaseException
515      * @throws IllegalStateException if the URI was not on the list
516      */
517     protected void deleteInProcessing(String uri) throws DatabaseException, IOException {
518         DatabaseEntry keyEntry = new DatabaseEntry();
519 
520         StringBinding.stringToEntry(uri, keyEntry);
521 
522         OperationStatus opStatus = processingUriDB.delete(null, keyEntry);
523 
524         if (opStatus != OperationStatus.SUCCESS) {
525             if (opStatus == OperationStatus.NOTFOUND) {
526                 throw new IllegalStateException("Trying to deleta a "
527                         + "non-existant URI from the list of URIs being "
528                         + "processed. HQ: " + hostName + ", CrawlURI: " + uri);
529             }
530             throw new IOException("Error occured deleting URI: " + uri
531                     + " from HQ " + hostName + " list "
532                     + "of URIs currently being processed. "
533                     + opStatus.toString());
534         }
535     }
536 
537     /***
538      * Adds a CrawlURI to the list of CrawlURIs belonging to this HQ and are
539      * being processed at the moment.
540      * 
541      * @param curi
542      *            The CrawlURI to add to the list
543      * @throws DatabaseException
544      * @throws IllegalStateException
545      *             if the CrawlURI is already in the list of URIs being
546      *             processed.
547      */
548     protected void addInProcessing(CrawlURI curi) throws DatabaseException,
549             IllegalStateException {
550         DatabaseEntry keyEntry = new DatabaseEntry();
551         DatabaseEntry dataEntry = new DatabaseEntry();
552 
553         StringBinding.stringToEntry(curi.toString(), keyEntry);
554         crawlURIBinding.objectToEntry(curi, dataEntry);
555 
556         OperationStatus opStatus = processingUriDB.putNoOverwrite(null,
557                 keyEntry, dataEntry);
558 
559         if (opStatus != OperationStatus.SUCCESS) {
560             if (opStatus == OperationStatus.KEYEXIST) {
561                 throw new IllegalStateException("Can not insert duplicate "
562                         + "URI into list of URIs being processed. " + "HQ: "
563                         + hostName + ", CrawlURI: " + curi.toString());
564             }
565             throw new IllegalStateException("Error occured adding CrawlURI: "
566                     + curi.toString() + " to HQ " + hostName + " list "
567                     + "of URIs currently being processed. "
568                     + opStatus.toString());
569         }
570     }
571     
572     /***
573      * Returns the CrawlURI associated with the specified URI (string) or null
574      * if no such CrawlURI is queued in this HQ. If CrawlURI is being processed
575      * it is not considered to be <i>queued </i> and this method will return
576      * null for any such URIs.
577      * 
578      * @param uri
579      *            A string representing the URI
580      * @return the CrawlURI associated with the specified URI (string) or null
581      *         if no such CrawlURI is queued in this HQ.
582      * 
583      * @throws DatabaseException
584      *             if a errors occurs reading the database
585      */
586     protected CrawlURI getCrawlURI(String uri) throws DatabaseException{
587         DatabaseEntry keyEntry = new DatabaseEntry();
588         DatabaseEntry dataEntry = new DatabaseEntry();
589         
590         primaryKeyBinding.objectToEntry(uri,keyEntry);
591         primaryUriDB.get(null,keyEntry,dataEntry,LockMode.DEFAULT);
592         
593         CrawlURI curi = (CrawlURI)crawlURIBinding.entryToObject(dataEntry);
594         
595         return curi;
596     }
597 
598     /***
599      * Update CrawlURI that has completed processing.
600      * 
601      * @param curi The CrawlURI. This must be a CrawlURI issued by this HQ's 
602      *             {@link #next() next()} method.
603      * @param needWait If true then the URI was processed successfully, 
604      *                 requiring a period of suspended action on that host. If
605      *                 valence is > 1 then seperate times are maintained for 
606      *                 each slot.
607      * @param wakeupTime If new state is 
608      *                   {@link AdaptiveRevisitHostQueue#HQSTATE_SNOOZED snoozed}
609      *                   then this parameter should contain the time (in 
610      *                   milliseconds) when it will be safe to wake the HQ up
611      *                   again. Otherwise this parameter will be ignored.
612      * 
613      * @throws IllegalStateException if the CrawlURI
614      *         does not match a CrawlURI issued for crawling by this HQ's
615      *         {@link AdaptiveRevisitHostQueue#next() next()}.
616      * @throws IOException if an error occurs accessing the database
617      */
618     public void update(CrawlURI curi, 
619                        boolean needWait, 
620                        long wakeupTime) 
621             throws IllegalStateException, IOException{
622         update(curi,needWait,wakeupTime,false);
623     }
624     
625     
626     /***
627      * Update CrawlURI that has completed processing.
628      * 
629      * @param curi The CrawlURI. This must be a CrawlURI issued by this HQ's 
630      *             {@link #next() next()} method.
631      * @param needWait If true then the URI was processed successfully, 
632      *                 requiring a period of suspended action on that host. If
633      *                 valence is > 1 then seperate times are maintained for 
634      *                 each slot.
635      * @param wakeupTime If new state is 
636      *                   {@link AdaptiveRevisitHostQueue#HQSTATE_SNOOZED snoozed}
637      *                   then this parameter should contain the time (in 
638      *                   milliseconds) when it will be safe to wake the HQ up
639      *                   again. Otherwise this parameter will be ignored.
640      * @param forgetURI If true, the URI will be deleted from the queue.
641      * 
642      * @throws IllegalStateException if the CrawlURI
643      *         does not match a CrawlURI issued for crawling by this HQ's
644      *         {@link AdaptiveRevisitHostQueue#next() next()}.
645      * @throws IOException if an error occurs accessing the database
646      */
647     public void update(CrawlURI curi, 
648                        boolean needWait, 
649                        long wakeupTime, 
650                        boolean forgetURI) 
651             throws IllegalStateException, IOException{
652         if (logger.isLoggable(Level.FINE)) {
653             logger.fine("Updating " + curi.toString());
654         }
655         try{
656             // First add it to the regular queue (if not forgetting it).
657             if (forgetURI == false){
658                 OperationStatus opStatus = strictAdd(curi,false);
659                 if(opStatus != OperationStatus.SUCCESS){
660                     if(opStatus == OperationStatus.KEYEXIST){
661                         throw new IllegalStateException("Trying to update a" +
662                             " CrawlURI failed because it was in the queue" +
663                             " of URIs waiting for processing. URIs currently" +
664                             " being processsed can never be in that queue." +
665                             " HQ: " + hostName + ", CrawlURI: " + 
666                             curi.toString());
667                     }
668                 }
669 
670                 // Check if we need to update nextReadyTime
671                 long curiTimeOfNextProcessing = curi.getLong(
672                         A_TIME_OF_NEXT_PROCESSING);
673                 if(nextReadyTime > curiTimeOfNextProcessing){
674                     setNextReadyTime(curiTimeOfNextProcessing);
675                 }
676                 
677             } else {
678                 size--;
679             }
680             
681             // Then remove from list of in processing URIs
682             deleteInProcessing(curi.toString());
683             
684             inProcessing--;
685             
686             // Update the wakeUpTime slot.
687             if(needWait==false){
688                 // Ok, no wait then. Set wake up time to 0.
689                 wakeupTime = 0;
690             }
691 
692             updateWakeUpTimeSlot(wakeupTime);
693         } catch (DatabaseException e) {
694             // Blanket catch all DBExceptions and convert to IOExceptions.
695             IOException e2 = new IOException(e.getMessage());
696             e2.setStackTrace(e.getStackTrace()); //preserve original stacktrace
697             throw e2; 
698         }
699     }
700 
701     /***
702      * Returns the 'top' URI in the AdaptiveRevisitHostQueue. 
703      * <p>
704      * HQ state will be set to {@link AdaptiveRevisitHostQueue#HQSTATE_BUSY busy} if this 
705      * method returns normally.
706  
707      * 
708      * @return a CrawlURI ready for processing
709      * 
710      * @throws IllegalStateException if the HostQueues current state is not
711      *         ready {@link AdaptiveRevisitHostQueue#HQSTATE_READY ready}
712      * @throws IOException if an error occurs reading from the database
713      */
714     public CrawlURI next() throws IllegalStateException, IOException{
715         try{
716             // Ok, lets issue a URI, first check state and reserve slot.
717             if(getState()!=HQSTATE_READY || useWakeUpTimeSlot()==false){
718                 throw new IllegalStateException("Can not issue next URI when " +
719                         "HQ " + hostName + " state is " + getStateByName());
720             }
721     
722             DatabaseEntry keyEntry = new DatabaseEntry();
723             
724             // Get the top URI
725             CrawlURI curi = peek();
726             
727             // Add it to processingUriDB
728             addInProcessing(curi);
729             
730             // Delete it from the primaryUriDB
731             primaryKeyBinding.objectToEntry(curi.toString(),keyEntry);
732             OperationStatus opStatus = primaryUriDB.delete(null,keyEntry);
733             
734             if(opStatus != OperationStatus.SUCCESS){
735                 throw new IOException("Error occured removing URI: " +
736                         curi.toString() + " from HQ " + hostName + 
737                         " priority queue for processing. " + opStatus.toString());
738             }
739             
740             // Finally update nextReadyTime with new top if one exists.
741             CrawlURI top = peek();
742             long nextReady = Long.MAX_VALUE;
743             if(top != null){
744                 nextReady = top.getLong(
745                         A_TIME_OF_NEXT_PROCESSING);
746             }
747             inProcessing++;
748             setNextReadyTime(nextReady);
749             logger.fine("Issuing " + curi.toString());
750             return curi;
751         } catch (DatabaseException e) {
752             // Blanket catch all DBExceptions and convert to IOExceptions.
753             IOException e2 = new IOException(e.getMessage());
754             e2.setStackTrace(e.getStackTrace()); //preserve original stacktrace
755             throw e2; 
756         }
757     }
758     
759     /***
760      * Returns the URI with the earliest time of next processing. I.e. the URI
761      * at the head of this host based priority queue.
762      * <p>
763      * Note: This method will return the head CrawlURI regardless of wether it 
764      * is safe to start processing it or not. CrawlURI will remain in the queue.
765      * The returned CrawlURI should only be used for queue inspection, it can 
766      * <i>not</i> be updated and returned to the queue. To get URIs ready for
767      * processing use {@link #next() next()}.
768      * 
769      * @return the URI with the earliest time of next processing or null if 
770      *         the queue is empty or all URIs are currently being processed.
771      * @throws IllegalStateException
772      * 
773      * @throws IOException if an error occurs reading from the database
774      */
775     public CrawlURI peek() throws IllegalStateException, IOException{
776         try{
777             
778             DatabaseEntry keyEntry = new DatabaseEntry();
779             DatabaseEntry dataEntry = new DatabaseEntry();
780             
781             CrawlURI curi = null;
782             Cursor secondaryCursor = secondaryUriDB.openCursor(null,null);
783             
784             OperationStatus opStatus = 
785                 secondaryCursor.getFirst(keyEntry, dataEntry, LockMode.DEFAULT);
786             
787             if( opStatus == OperationStatus.SUCCESS){
788                 curi = (CrawlURI)crawlURIBinding.entryToObject(dataEntry);
789             } else {
790                 if( opStatus == OperationStatus.NOTFOUND ){
791                    curi = null;
792                 } else {
793                     throw new IOException("Error occured in " +
794                         "AdaptiveRevisitHostQueue.peek()." + opStatus.toString());
795                 }
796             }
797             secondaryCursor.close();
798             return curi;
799         } catch (DatabaseException e) {
800             // Blanket catch all DBExceptions and convert to IOExceptions.
801             IOException e2 = new IOException(e.getMessage());
802             e2.setStackTrace(e.getStackTrace()); //preserve original stacktrace
803             throw e2; 
804         }
805     }
806     
807     
808     
809     /***
810      * Returns the current state of the HQ.
811      * 
812      * @return the current state of the HQ.
813      * 
814      * @see AdaptiveRevisitHostQueue#HQSTATE_BUSY
815      * @see AdaptiveRevisitHostQueue#HQSTATE_EMPTY
816      * @see AdaptiveRevisitHostQueue#HQSTATE_READY
817      * @see AdaptiveRevisitHostQueue#HQSTATE_SNOOZED
818      */
819     public int getState(){
820         if(state != HQSTATE_EMPTY){
821             // Need to confirm state
822             if(isBusy()){
823                 state = HQSTATE_BUSY;
824             } else {
825                 long currentTime = System.currentTimeMillis();
826                 long wakeTime = getEarliestWakeUpTimeSlot();
827                 
828                 if(wakeTime > currentTime || nextReadyTime > currentTime){
829                     state = HQSTATE_SNOOZED;
830                 } else {
831                     state = HQSTATE_READY;
832                 }
833             }
834         }
835         return state;
836     }
837     
838     /***
839      * Returns the time when the HQ will next be ready to issue a URI. 
840      * <p>
841      * If the queue is in a {@link #HQSTATE_SNOOZED snoozed} state then this 
842      * time will be in the future and reflects either the time when the HQ will
843      * again be able to issue URIs for processing because politness constraints
844      * have ended, or when a URI next becomes available for visit, whichever is
845      * larger.
846      * <p>
847      * If the queue is in a {@link #HQSTATE_READY ready} state this time will
848      * be in the past and reflect the earliest time when the HQ had a URI ready
849      * for processing, taking time spent snoozed for politness concerns into
850      * account. 
851      * <p>
852      * If the HQ is in any other state then the return value of this method is
853      * equal to Long.MAX_VALUE.
854      * <p>
855      * This value may change each time a URI is added, issued or updated.
856      * 
857      * @return the time when the HQ will next be ready to issue a URI
858      */
859     public long getNextReadyTime(){
860         if(getState()==HQSTATE_BUSY || getState()==HQSTATE_EMPTY){
861             // Have no idea when HQ next be able issue a URI
862             return Long.MAX_VALUE;
863         }
864         long wakeTime = getEarliestWakeUpTimeSlot();
865         return nextReadyTime > wakeTime ? nextReadyTime : wakeTime;
866     }
867     
868     /***
869      * Updates nextReadyTime (if smaller) with the supplied value
870      * @param newTime the new value of nextReady Time;
871      */
872     protected void setNextReadyTime(long newTime){
873         if(logger.isLoggable(Level.FINEST)){
874             logger.finest("Setting next ready to new value " + newTime + 
875                     " from " + getNextReadyTime());
876         }
877         nextReadyTime=newTime;
878         reorder();
879     }
880     
881     /***
882      * Method is called whenever something has been done that might have
883      * changed the value of the 'published' time of next ready. If an owner 
884      * has been specified it will be notified that the value may have changed..
885      */
886     protected void reorder(){
887         if(owner != null){
888             owner.reorder(this);
889         }
890     }
891 
892 
893     /***
894      * Same as {@link #getState() getState()} except this method returns a 
895      * human readable name for the state instead of its constant integer value.
896      * <p>
897      * Should only be used for reports, error messages and other strings 
898      * intended for human eyes.
899      * 
900      * @return the human readable name of the current state
901      */
902     public String getStateByName() {
903         switch(getState()){
904             case HQSTATE_BUSY : return "busy";
905             case HQSTATE_EMPTY : return "empty";
906             case HQSTATE_READY : return "ready";
907             case HQSTATE_SNOOZED : return "snoozed";
908         }
909         // This should be impossible unless new states are added without 
910         // updating this method.
911         return "undefined"; 
912     }
913     
914     /***
915      * Returns the size of the HQ. That is, the number of URIs queued, 
916      * including any that are currently being processed.
917      * 
918      * @return the size of the HQ.
919      */   
920     public long getSize(){
921         return size;
922     }
923     
924     /***
925      * Set the AdaptiveRevisitQueueList object that contains this HQ. Will cause
926      * that
927      * object to be notified (via 
928      * {@link AdaptiveRevisitQueueList#reorder(AdaptiveRevisitHostQueue)
929      * reorder()} when the
930      * value used for sorting the list of HQs changes.
931      * @param owner the ARHostQueueList object that contains this HQ.
932      */
933     public void setOwner(AdaptiveRevisitQueueList owner) {
934         this.owner = owner;
935     }
936 
937     /***
938      * Cleanup all open Berkeley Database objects.
939      * <p>
940      * Does <I>not</I> close the Environment.
941      * 
942      * @throws IOException if an error occurs closing a database object
943      */
944     public void close() throws IOException{
945         try{
946             secondaryUriDB.close();
947             processingUriDB.close();
948             primaryUriDB.close();
949         } catch (DatabaseException e) {
950             // Blanket catch all DBExceptions and convert to IOExceptions.
951             IOException e2 = new IOException(e.getMessage());
952             e2.setStackTrace(e.getStackTrace()); //preserve original stacktrace
953             throw e2; 
954         }
955     }
956     
957     
958     /***
959      * If true then the HQ has no available slot for issuing URIs. 
960      * <p>
961      * I.e. number of in processing URIs = valence.
962      * 
963      * @return true if number of in processing URIs = valence 
964      */
965     private boolean isBusy(){
966         return inProcessing == valence;
967     }
968     
969     /***
970      * Overwrites a used (-1) value in wakeUpTime[] with the supplied value.
971      * @param newVal
972      */
973     private void updateWakeUpTimeSlot(long newVal){
974         for(int i=0 ; i < valence ; i++){
975             if(wakeUpTime[i]==-1){
976                 wakeUpTime[i]=newVal;
977             }
978         }
979         reorder();
980     }
981     
982     /***
983      * A new URI is being issued. Set the wakeup time on an unused slot to -1.
984      * 
985      * @return true if a slot was successfully reserved. False otherwise.
986      */
987     private boolean useWakeUpTimeSlot(){
988         for(int i=0 ; i < valence ; i++){
989             if(wakeUpTime[i]>-1 && wakeUpTime[i]<=System.currentTimeMillis()){
990                 wakeUpTime[i]=-1;
991                 return true;
992             }
993         }
994         reorder();
995         return false;
996     }
997     
998     /***
999      * Returns the earliest time when a wake up slot will become available. If
1000      * one is already available then this time will be in the past.
1001      * <p>
1002      * If all slots are taken with URIs currently being processed (i.e. HQ state
1003      * is {@link #HQSTATE_BUSY busy} then this will return Long.MAX_VALUE;
1004      * @return the earliest time when a wake up slot will become available
1005      */
1006     private long getEarliestWakeUpTimeSlot(){
1007         long earliest = Long.MAX_VALUE;
1008         for(int i=0 ; i < valence ; i++){
1009             if(wakeUpTime[i]>-1 && wakeUpTime[i]<earliest){
1010                 earliest = wakeUpTime[i];
1011             }
1012         }
1013         return earliest;
1014     }
1015     
1016     /***
1017      * Returns a report detailing the status of this HQ.
1018      * @param max Maximum number of URIs to show. 0 equals no limit.
1019      * @return a report detailing the status of this HQ.
1020      */
1021     public String report(int max){
1022         try{
1023             StringBuffer ret = new StringBuffer(256);
1024             ret.append("AdaptiveRevisitHostQueue: " + hostName + "\n");
1025             ret.append("Size:       " + size + "\n");
1026             ret.append("State:      " + getStateByName() + "\n");
1027             if(getState()==HQSTATE_BUSY){
1028                 ret.append("Processing URIs: \n");
1029                 Cursor processingCursor = processingUriDB.openCursor(null,null);
1030                 reportURIs(ret, processingCursor, valence);
1031                 processingCursor.close();
1032             } else {
1033                 ret.append("Next ready: " + 
1034                         ArchiveUtils.formatMillisecondsToConventional(
1035                             getNextReadyTime() - System.currentTimeMillis()) + 
1036                             "\n");
1037             }
1038             ret.append("Top URIs: \n");
1039             
1040             Cursor secondaryCursor = secondaryUriDB.openCursor(null,null);
1041             reportURIs(ret,secondaryCursor,max);
1042             secondaryCursor.close();
1043             return ret.toString();
1044         } catch( DatabaseException e ){
1045             return "Exception occured compiling report:\n" + e.getMessage();
1046         }
1047     }
1048     
1049     /***
1050      * Adds a report of the first <code>max</code> URIs that the cursor points
1051      * to to the stringbuffer object. 
1052      * 
1053      * @param ret The stringbuffer to append to
1054      * @param cursor The cursor pointing at a URI database
1055      * @param max Maximum number of URIs to report on. If fewer URIs are in the
1056      *            database, all URIs are shown
1057      * @throws DatabaseException if an error occurs
1058      */
1059     private void reportURIs(StringBuffer ret, Cursor cursor, int max) 
1060             throws DatabaseException{
1061         DatabaseEntry keyEntry = new DatabaseEntry();
1062         DatabaseEntry dataEntry = new DatabaseEntry();
1063         OperationStatus opStatus = 
1064             cursor.getFirst(keyEntry,dataEntry,LockMode.DEFAULT);
1065         if(max == 0){
1066         	// No limit on the number of values returned.
1067         	max = Integer.MAX_VALUE;
1068         }
1069         int i = 0;
1070         while(i<max && opStatus == OperationStatus.SUCCESS){
1071             CrawlURI tmp = (CrawlURI)crawlURIBinding.entryToObject(dataEntry);
1072             ret.append(" URI:                " + tmp.toString() + "\n");
1073             switch(tmp.getSchedulingDirective()){
1074                 case CandidateURI.HIGHEST : 
1075                     ret.append("  Sched. directive:  HIGHEST\n"); break;
1076                 case CandidateURI.HIGH : 
1077                     ret.append("  Sched. directive:  HIGH\n"); break;
1078                 case CandidateURI.MEDIUM : 
1079                     ret.append("  Sched. directive:  MEDIUM\n"); break;
1080                 case CandidateURI.NORMAL : 
1081                     ret.append("  Sched. directive:  NORMAL\n"); break;
1082             }
1083             ret.append("  Next processing:   ");
1084             long nextProcessing = 
1085                 tmp.getLong(A_TIME_OF_NEXT_PROCESSING) - 
1086                 System.currentTimeMillis();
1087             if(nextProcessing < 0){
1088                 ret.append("Overdue  ");
1089                 nextProcessing = nextProcessing*-1;
1090             } 
1091             ret.append(ArchiveUtils.formatMillisecondsToConventional(
1092                     nextProcessing) + "\n");
1093             if(tmp.getFetchStatus()!=0){
1094                 ret.append("  Last fetch status: " + 
1095                         tmp.getFetchStatus() + "\n");
1096             }
1097             if(tmp.containsKey(A_WAIT_INTERVAL)){
1098                 ret.append("  Wait interval:     " + 
1099                         ArchiveUtils.formatMillisecondsToConventional(
1100                                 tmp.getLong(A_WAIT_INTERVAL)) + "\n");
1101             }
1102             if(tmp.containsKey(A_NUMBER_OF_VISITS)){
1103                 ret.append("  Visits:            " + tmp.getInt(
1104                         A_NUMBER_OF_VISITS) + "\n");
1105             }
1106             if(tmp.containsKey(A_NUMBER_OF_VERSIONS)){
1107                 ret.append("  Versions:          " + tmp.getInt(
1108                         A_NUMBER_OF_VERSIONS) + "\n");
1109             }
1110             
1111             opStatus = cursor.getNext(keyEntry,dataEntry,LockMode.DEFAULT);
1112             i++;
1113         }
1114     }
1115     
1116     /***
1117      * Creates the secondary key for the secondary index.
1118      * <p>
1119      * The secondary index is the scheduling directive (first sorting) and 
1120      * the time of next processing (sorted from earlies to latest within each
1121      * scheduling directive). If the scheduling directive is missing or 
1122      * unknown NORMAL will be assumed.   
1123      */
1124     private static class OrderOfProcessingKeyCreator 
1125             extends TupleSerialKeyCreator {
1126 
1127         /***
1128          * Constructor. Invokes parent constructor.
1129          * 
1130          * @param classCatalog is the catalog to hold shared class information 
1131          *                     and for a database should be a 
1132          *                     StoredClassCatalog.
1133          * @param dataClass is the CrawlURI class. 
1134          */
1135         public OrderOfProcessingKeyCreator(ClassCatalog classCatalog, 
1136                 Class dataClass) {
1137             super(classCatalog, dataClass);
1138         }
1139 
1140         /* (non-Javadoc)
1141          * @see com.sleepycat.bind.serial.TupleSerialKeyCreator#createSecondaryKey(com.sleepycat.bind.tuple.TupleInput, java.lang.Object, com.sleepycat.bind.tuple.TupleOutput)
1142          */
1143         public boolean createSecondaryKey(TupleInput primaryKeyInput, 
1144                                           Object dataInput, 
1145                                           TupleOutput indexKeyOutput) {
1146             CrawlURI curi = (CrawlURI)dataInput;
1147             int directive = curi.getSchedulingDirective();
1148             // Can not rely on the default directive constants having a good
1149             // sort order
1150             switch (directive) {
1151             case CandidateURI.HIGHEST:
1152                 directive = 0;
1153                 break;
1154             case CandidateURI.HIGH:
1155                 directive = 1;
1156                 break;
1157             case CandidateURI.MEDIUM:
1158                 directive = 2;
1159                 break;
1160             case CandidateURI.NORMAL:
1161                 directive = 3;
1162                 break;
1163             default:
1164                 directive = 3; // If directive missing or unknown
1165             }
1166             
1167             indexKeyOutput.writeInt(directive);
1168             long timeOfNextProcessing =
1169                 curi.getLong(A_TIME_OF_NEXT_PROCESSING);
1170             
1171             indexKeyOutput.writeLong(timeOfNextProcessing);
1172             return true;
1173         }
1174     }
1175 
1176     /* (non-Javadoc)
1177      * @see org.archive.crawler.datamodel.CrawlSubstats.HasCrawlSubstats#getSubstats()
1178      */
1179     public CrawlSubstats getSubstats() {
1180         return substats;
1181     }
1182 
1183 }