View Javadoc

1   /* ARHostQueueList.java
2   *
3   * Created on Sep 13, 2004
4   *
5   * Copyright (C) 2004 Kristinn Sigur?sson.
6   *
7   * This file is part of the Heritrix web crawler (crawler.archive.org).
8   *
9   * Heritrix is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU Lesser Public License as published by
11  * the Free Software Foundation; either version 2.1 of the License, or
12  * any later version.
13  *
14  * Heritrix is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU Lesser Public License for more details.
18  *
19  * You should have received a copy of the GNU Lesser Public License
20  * along with Heritrix; if not, write to the Free Software
21  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
22  */
23  package org.archive.crawler.frontier;
24  
25  import java.io.IOException;
26  import java.io.PrintWriter;
27  import java.util.HashMap;
28  import java.util.Iterator;
29  import java.util.TreeSet;
30  import java.util.logging.Level;
31  import java.util.logging.Logger;
32  
33  import org.archive.util.ArchiveUtils;
34  import org.archive.util.Reporter;
35  
36  import com.sleepycat.bind.EntryBinding;
37  import com.sleepycat.bind.serial.StoredClassCatalog;
38  import com.sleepycat.bind.tuple.IntegerBinding;
39  import com.sleepycat.bind.tuple.StringBinding;
40  import com.sleepycat.je.Cursor;
41  import com.sleepycat.je.Database;
42  import com.sleepycat.je.DatabaseConfig;
43  import com.sleepycat.je.DatabaseEntry;
44  import com.sleepycat.je.DatabaseException;
45  import com.sleepycat.je.Environment;
46  import com.sleepycat.je.LockMode;
47  import com.sleepycat.je.OperationStatus;
48  
49  /***
50   * Maintains an ordered list of {@link AdaptiveRevisitHostQueue}s used by a
51   * Frontier.
52   * <p> 
53   * The list is ordered by the 
54   * {@link AdaptiveRevisitHostQueue#getNextReadyTime() AdaptiveRevisitHostQueue.getNextReadyTime()}, 
55   * smallest value at the top of the list and then on in descending order.
56   * <p>
57   * The list will maintain a list of hostnames in a seperate DB. On creation a
58   * list will try to open the DB at a specified location. If it already exists
59   * the list will create HQs for all the hostnames in the list, discarding 
60   * those that turn out to be empty.
61   * <p>
62   * Any BDB DatabaseException will be converted to an IOException by public 
63   * methods. This includes preserving the original stacktrace, in favor of the
64   * one created for the IOException, so that the true source of the exception
65   * is not lost. 
66   *
67   * @author Kristinn Sigurdsson
68   */
69  public class AdaptiveRevisitQueueList implements Reporter {
70  
71      // TODO: Handle HQs becoming empty.
72      
73      /*** The Environment for the BerkleyDB databases in the HQs */
74      private final Environment env;
75      private final StoredClassCatalog catalog;
76      /*** Contains host names for all HQs. Name is key, valence is value */
77      private Database hostNamesDB;
78      private EntryBinding keyBinding;
79      private EntryBinding valueBinding;
80      
81      /*** A hash table of all the HQs (wrapped), keyed by hostName */
82      private HashMap<String,AdaptiveRevisitHostQueueWrapper> hostQueues; 
83      /*** Contains the hostQueues (wrapped) sorted by their time of next fetch */
84      private TreeSet<AdaptiveRevisitHostQueueWrapper> sortedHostQueues;
85      /*** Logger */
86      private static final Logger logger =
87          Logger.getLogger(AdaptiveRevisitQueueList.class.getName());
88  
89      
90      public AdaptiveRevisitQueueList(Environment env,
91              StoredClassCatalog catalog)
92      throws IOException {
93          Cursor cursor = null;
94          try {
95              this.env = env;
96              this.catalog = catalog;
97              keyBinding = new StringBinding();
98              valueBinding = new IntegerBinding();
99  
100             // Then initialize other data
101             hostQueues = new HashMap<String,AdaptiveRevisitHostQueueWrapper>();
102             sortedHostQueues = new TreeSet<AdaptiveRevisitHostQueueWrapper>();
103 
104             // Open the hostNamesDB
105             DatabaseConfig dbConfig = new DatabaseConfig();
106             dbConfig.setTransactional(false);
107             dbConfig.setAllowCreate(true);
108             hostNamesDB = env.openDatabase(null, "hostNames", dbConfig);
109 
110             // Read any existing hostNames and create relevant HQs
111             cursor = hostNamesDB.openCursor(null, null);
112             DatabaseEntry keyEntry = new DatabaseEntry();
113             DatabaseEntry dataEntry = new DatabaseEntry();
114             OperationStatus opStatus = cursor.getFirst(keyEntry, dataEntry,
115                     LockMode.DEFAULT);
116             while (opStatus == OperationStatus.SUCCESS) {
117                 // Got one!
118                 String hostName = (String)keyBinding.entryToObject(keyEntry);
119                 int valence = ((Integer)valueBinding.entryToObject(dataEntry)).
120                     intValue();
121                 opStatus = cursor.getNext(keyEntry, dataEntry, LockMode.DEFAULT);
122                 // Create HQ
123                 createHQ(hostName, valence);
124                 // TODO: If the hq is empty, then it can be discarded
125                 // immediately
126             }
127         } catch (DatabaseException e) {
128             throw convertDbException(e);
129         } finally {
130             if (cursor != null) {
131                 try {
132                     cursor.close();
133                 } catch (DatabaseException e) {
134                     throw convertDbException(e);
135                 }
136             }
137         }
138     }
139         
140     private IOException convertDbException(Exception e) {
141         IOException e2 = new IOException(e.getMessage());
142         e2.setStackTrace(e.getStackTrace());
143         return e2;
144     }
145     
146     /***
147      * Get an AdaptiveRevisitHostQueue for the specified host.
148      * <p>
149      * If one does not already exist, null is returned 
150      * 
151      * @param hostName The host's name 
152      * @return an AdaptiveRevisitHostQueue for the specified host
153      */
154     public AdaptiveRevisitHostQueue getHQ(String hostName) {
155         AdaptiveRevisitHostQueueWrapper wrapper =
156             ((AdaptiveRevisitHostQueueWrapper)hostQueues.get(hostName));
157         if (wrapper != null) {
158             return wrapper.hq;
159         }
160         return null;
161     }
162     
163     /***
164      * Creates a new AdaptiveRevisitHostQueue.
165      * <p>
166      * If a HQ already existed for the specified hostName, the existing HQ
167      * is returned as it is. It's existing valence will <i>not</i> be updated
168      * to reflect a different valence.
169      * 
170      * @param hostName
171      * @param valence number of simultaneous connections allowed to this host
172      * @return the newly created HQ
173      * @throws IOException
174      */
175     public AdaptiveRevisitHostQueue createHQ(String hostName, int valence)
176             throws IOException{
177         AdaptiveRevisitHostQueueWrapper hqw = hostQueues.get(hostName);
178         if (hqw != null) {
179             return hqw.hq;
180         }
181         AdaptiveRevisitHostQueue hq;
182         // Ok, the HQ does not already exist. (Had to make sure) 
183         // Create it, save it and return it.
184         hq = new AdaptiveRevisitHostQueue(hostName, env, catalog, valence);
185         hq.setOwner(this);
186         
187         try{
188             DatabaseEntry keyEntry = new DatabaseEntry();
189             DatabaseEntry dataEntry = new DatabaseEntry();
190             keyBinding.objectToEntry(hostName,keyEntry);
191             valueBinding.objectToEntry(new Integer(valence),dataEntry);
192             hostNamesDB.put(null,keyEntry,dataEntry);
193             AdaptiveRevisitHostQueueWrapper tmp =
194                 new AdaptiveRevisitHostQueueWrapper(hq);
195             hostQueues.put(hostName,tmp);
196             sortedHostQueues.add(tmp);
197             return hq;
198         } catch (DatabaseException e) {
199             throw convertDbException(e);
200         }
201     }
202     
203     public AdaptiveRevisitHostQueue getTopHQ(){
204         AdaptiveRevisitHostQueueWrapper wrapper = 
205             (AdaptiveRevisitHostQueueWrapper)sortedHostQueues.first(); 
206         return wrapper.hq;
207     }
208 
209     /***
210      * Returns the number of URIs in all the HQs in this list
211      * @return the number of URIs in all the HQs in this list
212      */
213     public long getSize() {
214     	long size = 0;
215         for (Iterator it = sortedHostQueues.iterator(); it.hasNext();) {
216             AdaptiveRevisitHostQueue hq = ((AdaptiveRevisitHostQueueWrapper)it
217                     .next()).hq;
218             size += hq.getSize();
219         }
220         return size;
221 	}
222     
223     /***
224      * Returns the average depth of all the HQs in this list
225      * @return the average depth of all the HQs in this list (rounded down)
226      */
227     public long getAverageDepth() {
228     	long size = getSize();
229     	return size/hostQueues.size();
230     }
231     
232     /***
233      * Returns the size of the largest (deepest) queue.
234      * @return the size of the largest (deepest) queue.
235      */
236     public long getDeepestQueueSize(){
237     	long size = 0;
238         for (Iterator it = sortedHostQueues.iterator(); it.hasNext();) {
239             AdaptiveRevisitHostQueue hq = ((AdaptiveRevisitHostQueueWrapper)it
240                     .next()).hq;
241             if(hq.getSize() > size){
242             	size = hq.getSize();
243             }
244         }
245         return size;
246     }
247     
248     /***
249      * Returns the congestion ratio.
250      * <p>
251      * The congestion ratio is equal to the total number of queues divided
252      * by the number of queues currently being processed or are snozzed (i.e. 
253      * not ready). A congestion ratio of 1 indicates no congestion.
254      * @return the congestion ratio
255      */
256     public float getCongestionRatio(){
257     	int readyQueues = 0;
258         for (Iterator it = sortedHostQueues.iterator(); it.hasNext();) {
259             AdaptiveRevisitHostQueue hq = ((AdaptiveRevisitHostQueueWrapper)it
260                     .next()).hq;
261             if(hq.getState() == AdaptiveRevisitHostQueue.HQSTATE_READY){
262             	readyQueues++;
263             }
264         }
265         int totalQueues = hostQueues.size();
266         
267         return (float)(totalQueues) / (totalQueues-readyQueues);
268     }
269     
270     /***
271      * This method reorders the host queues. Method is only called by the
272      * AdaptiveRevisitHostQueue that it 'owns' when their reported time of next
273      * ready is being updated.
274      * 
275      * @param hq The calling HQ
276      */
277     protected void reorder(AdaptiveRevisitHostQueue hq){
278         // Find the wrapper
279         AdaptiveRevisitHostQueueWrapper wrapper = 
280             (AdaptiveRevisitHostQueueWrapper)hostQueues.get(hq.getHostName());
281         
282         long newTime = hq.getNextReadyTime();
283         
284         if(newTime != wrapper.nextReadyTime){
285             // Ok, the time has changed, move the queue around.
286             if (logger.isLoggable(Level.FINER)) {
287                 logger.finer("reorder(" + hq.getHostName() + ") was "
288                         + wrapper.nextReadyTime);
289             }
290             // Remove it from the sorted list
291             sortedHostQueues.remove(wrapper);
292             // Update the time on the ref.
293             wrapper.nextReadyTime = newTime;
294             if (logger.isLoggable(Level.FINER)) {
295                 logger.finer("reorder(" + hq.getHostName() + ") is "
296                         + wrapper.nextReadyTime);
297             }
298             // Readd to the list
299             sortedHostQueues.add(wrapper);
300         }
301     }
302     
303     /***
304      * The total number of URIs queued in all the HQs belonging to this list.
305      * 
306      * @return total number of URIs queued in all the HQs belonging to this list.
307      */
308     public long getUriCount(){
309         Iterator it = hostQueues.keySet().iterator();
310         long count = 0;
311         while(it.hasNext()){
312             AdaptiveRevisitHostQueueWrapper wrapper =
313                 (AdaptiveRevisitHostQueueWrapper)it.next();
314             count += wrapper.hq.getSize();
315         }
316         return count;
317     }
318         
319     /***
320      * This class wraps an AdaptiveRevisitHostQueue with a fixed value for next
321      * ready time.
322      * This is done to facilitate sorting by making sure that the value does 
323      * not change while the HQ is in the sorted list. With this wrapper, it
324      * is possible to remove it from the sorted list, then update the time of
325      * next ready, and then readd (resort) it. 
326      *
327      * @author Kristinn Sigurdsson
328      */
329     private class AdaptiveRevisitHostQueueWrapper implements Comparable{
330         long nextReadyTime;
331         AdaptiveRevisitHostQueue hq;
332         
333         public AdaptiveRevisitHostQueueWrapper(AdaptiveRevisitHostQueue hq){
334             nextReadyTime = hq.getNextReadyTime();
335             this.hq = hq;
336         }
337         
338         /***
339          * Compares the this ARHQWrapper to the supplied one.
340          * 
341          * @param obj the HQ to compare to. If this object is not an instance
342          *             of ARHostQueueWrapper then the method will throw a 
343          *             ClassCastException.
344          * @return a value less than 0 if this HQWrappers time of next ready
345          *         value is less than the argument HQWrappers's; and a value 
346          *         greater than 0 if this value is greater. If the time of
347          *         next ready is equal, the hostName strings will be compared
348          *         and that result returned.
349          */
350         public int compareTo(Object obj){
351             AdaptiveRevisitHostQueueWrapper comp =
352                 (AdaptiveRevisitHostQueueWrapper)obj;
353 
354             long compTime = comp.nextReadyTime;
355             
356             if(nextReadyTime>compTime){
357                 return 1;
358             } else if(nextReadyTime<compTime){
359                 return -1;
360             } else {
361                 // Equal time. Use hostnames
362                 return hq.getHostName().compareTo(comp.hq.getHostName());
363             }
364         }
365     }
366     
367     /***
368      * Closes all HQs and the Environment. 
369      */
370     public void close(){
371         Iterator it = sortedHostQueues.iterator();
372         while(it.hasNext()){
373             AdaptiveRevisitHostQueue hq =
374                 ((AdaptiveRevisitHostQueueWrapper)it.next()).hq;
375             try {
376                 hq.close();
377             } catch (IOException e) {
378                 logger.severe("IOException while closing " + hq.getHostName() +
379                         "\n" + e.getMessage());
380             }
381         }
382         try {
383             hostNamesDB.close();
384         } catch (DatabaseException e) {
385             logger.severe("IOException while closing hostNamesDB" +
386                     "\n" + e.getMessage());
387         }
388     }
389     
390     //
391     // Reporter implementation
392     //
393     
394     public String[] getReports() {
395         // none but default for now
396         return new String[] {};
397     }
398     
399     /* (non-Javadoc)
400      * @see org.archive.util.Reporter#singleLineReport()
401      */
402     public String singleLineReport() {
403         return ArchiveUtils.singleLineReport(this);
404     }
405 
406     /* (non-Javadoc)
407      * @see org.archive.util.Reporter#reportTo(java.io.Writer)
408      */
409     public void reportTo(PrintWriter writer) {
410         Iterator it = sortedHostQueues.iterator();
411         while(it.hasNext()){
412             AdaptiveRevisitHostQueueWrapper wrapper =
413                 (AdaptiveRevisitHostQueueWrapper)it.next();
414           
415             writer.print(wrapper.hq.report(10));
416             writer.print("\n\n");
417         }
418     }
419     
420     public void reportTo(String name, PrintWriter writer) {
421         if(name==null || hostQueues.containsKey(name)==false){
422         	reportTo(writer);
423         } else {
424 	    	AdaptiveRevisitHostQueueWrapper wrapper =
425 	                (AdaptiveRevisitHostQueueWrapper)hostQueues.get(name);
426 	          
427 	        writer.print(wrapper.hq.report(0));
428 	        writer.print("\n\n");
429         }
430     }
431     
432     public void singleLineReportTo(PrintWriter writer) {
433         Iterator it = sortedHostQueues.iterator();
434         int total = 0;
435         int ready = 0;
436         int snoozed = 0;
437         int empty = 0;
438         int busy = 0;
439         while(it.hasNext()){
440             AdaptiveRevisitHostQueueWrapper wrapper =
441                 (AdaptiveRevisitHostQueueWrapper)it.next();
442             total++;
443             switch(wrapper.hq.getState()){
444                 case AdaptiveRevisitHostQueue.HQSTATE_BUSY : busy++; break;
445                 case AdaptiveRevisitHostQueue.HQSTATE_EMPTY : empty++; break;
446                 case AdaptiveRevisitHostQueue.HQSTATE_READY : ready++; break;
447                 case AdaptiveRevisitHostQueue.HQSTATE_SNOOZED : snoozed++; break;
448             }
449         }
450         writer.print(total + " queues: " + ready + " ready, " + snoozed + 
451             " snoozed, " + busy + " busy, and " + empty + " empty");
452     }
453     
454     /* (non-Javadoc)
455      * @see org.archive.util.Reporter#singleLineLegend()
456      */
457     public String singleLineLegend() {
458         return "total ready snoozed busy empty";
459     }
460 
461 }