View Javadoc

1   /* Copyright (C) 2009 Internet Archive
2    *
3    * This file is part of the Heritrix web crawler (crawler.archive.org).
4    *
5    * Heritrix is free software; you can redistribute it and/or modify
6    * it under the terms of the GNU Lesser Public License as published by
7    * the Free Software Foundation; either version 2.1 of the License, or
8    * any later version.
9    *
10   * Heritrix is distributed in the hope that it will be useful,
11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13   * GNU Lesser Public License for more details.
14   *
15   * You should have received a copy of the GNU Lesser Public License
16   * along with Heritrix; if not, write to the Free Software
17   * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
18   *
19    */
20  package org.archive.crawler.frontier;
21  
22  import java.io.IOException;
23  import java.io.PrintWriter;
24  import java.io.Serializable;
25  import java.util.ArrayList;
26  import java.util.Collection;
27  import java.util.Collections;
28  import java.util.Iterator;
29  import java.util.Queue;
30  import java.util.SortedSet;
31  import java.util.Timer;
32  import java.util.TimerTask;
33  import java.util.TreeSet;
34  import java.util.concurrent.BlockingQueue;
35  import java.util.concurrent.LinkedBlockingQueue;
36  import java.util.concurrent.Semaphore;
37  import java.util.concurrent.TimeUnit;
38  import java.util.logging.Level;
39  import java.util.logging.Logger;
40  
41  import org.apache.commons.collections.Bag;
42  import org.apache.commons.collections.BagUtils;
43  import org.apache.commons.collections.bag.HashBag;
44  import org.apache.commons.lang.StringUtils;
45  import org.archive.crawler.datamodel.CandidateURI;
46  import org.archive.crawler.datamodel.CoreAttributeConstants;
47  import org.archive.crawler.datamodel.CrawlURI;
48  import org.archive.crawler.datamodel.FetchStatusCodes;
49  import org.archive.crawler.datamodel.UriUniqFilter;
50  import org.archive.crawler.datamodel.UriUniqFilter.HasUriReceiver;
51  import org.archive.crawler.framework.CrawlController;
52  import org.archive.crawler.framework.Frontier;
53  import org.archive.crawler.framework.exceptions.EndedException;
54  import org.archive.crawler.framework.exceptions.FatalConfigurationException;
55  import org.archive.crawler.settings.SimpleType;
56  import org.archive.crawler.settings.Type;
57  import org.archive.net.UURI;
58  import org.archive.util.ArchiveUtils;
59  import org.archive.util.ObjectIdentityCache;
60  import org.archive.util.ObjectIdentityMemCache;
61  
62  import com.sleepycat.collections.StoredIterator;
63  
64  /***
65   * A common Frontier base using several queues to hold pending URIs. 
66   * 
67   * Uses in-memory map of all known 'queues' inside a single database.
68   * Round-robins between all queues.
69   *
70   * @author Gordon Mohr
71   * @author Christian Kohlschuetter
72   */
73  public abstract class WorkQueueFrontier extends AbstractFrontier
74  implements FetchStatusCodes, CoreAttributeConstants, HasUriReceiver,
75          Serializable {
76  	private static final long serialVersionUID = 570384305871965843L;
77  	
78      public class WakeTask extends TimerTask {
79          @Override
80          public void run() {
81              synchronized(snoozedClassQueues) {
82                  if(this!=nextWake) {
83                      // an intervening waketask was made
84                      return;
85                  }
86                  wakeQueues();
87              }
88          }
89      }
90  
91      /*** truncate reporting of queues at some large but not unbounded number */
92      private static final int REPORT_MAX_QUEUES = 2000;
93      
94      /***
95       * If we know that only a small amount of queues is held in memory,
96       * we can avoid using a disk-based BigMap.
97       * This only works efficiently if the WorkQueue does not hold its
98       * entries in memory as well.
99       */ 
100     private static final int MAX_QUEUES_TO_HOLD_ALLQUEUES_IN_MEMORY = 3000;
101 
102     /***
103      * When a snooze target for a queue is longer than this amount, and 
104      * there are already ready queues, deactivate rather than snooze 
105      * the current queue -- so other more responsive sites get a chance
106      * in active rotation. (As a result, queue's next try may be much
107      * further in the future than the snooze target delay.)
108      */
109     public final static String ATTR_SNOOZE_DEACTIVATE_MS =
110         "snooze-deactivate-ms";
111     public static Long DEFAULT_SNOOZE_DEACTIVATE_MS = new Long(5*60*1000); // 5 minutes
112     
113     private static final Logger logger =
114         Logger.getLogger(WorkQueueFrontier.class.getName());
115     
116     /*** whether to hold queues INACTIVE until needed for throughput */
117     public final static String ATTR_HOLD_QUEUES = "hold-queues";
118     protected final static Boolean DEFAULT_HOLD_QUEUES = new Boolean(true); 
119 
120     /*** amount to replenish budget on each activation (duty cycle) */
121     public final static String ATTR_BALANCE_REPLENISH_AMOUNT =
122         "balance-replenish-amount";
123     protected final static Integer DEFAULT_BALANCE_REPLENISH_AMOUNT =
124         new Integer(3000);
125     
126     /*** whether to hold queues INACTIVE until needed for throughput */
127     public final static String ATTR_ERROR_PENALTY_AMOUNT =
128         "error-penalty-amount";
129     protected final static Integer DEFAULT_ERROR_PENALTY_AMOUNT =
130         new Integer(100);
131 
132 
133     /*** total expenditure to allow a queue before 'retiring' it  */
134     public final static String ATTR_QUEUE_TOTAL_BUDGET = "queue-total-budget";
135     protected final static Long DEFAULT_QUEUE_TOTAL_BUDGET = new Long(-1);
136 
137     /*** cost assignment policy to use (by class name) */
138     public final static String ATTR_COST_POLICY = "cost-policy";
139     protected final static String DEFAULT_COST_POLICY =
140         UnitCostAssignmentPolicy.class.getName();
141 
142     /*** target size of ready queues backlog */
143     public final static String ATTR_TARGET_READY_QUEUES_BACKLOG =
144         "target-ready-backlog";
145     protected final static Integer DEFAULT_TARGET_READY_QUEUES_BACKLOG =
146         new Integer(50);
147     
148     /*** those UURIs which are already in-process (or processed), and
149      thus should not be rescheduled */
150     protected transient UriUniqFilter alreadyIncluded;
151 
152     /*** All known queues.
153      */
154     protected transient ObjectIdentityCache<String,WorkQueue> allQueues = null; 
155     // of classKey -> ClassKeyQueue
156 
157     /***
158      * All per-class queues whose first item may be handed out.
159      * Linked-list of keys for the queues.
160      */
161     protected BlockingQueue<String> readyClassQueues;
162     
163     /*** Target (minimum) size to keep readyClassQueues */
164     protected int targetSizeForReadyQueues;
165     
166     /*** single-thread access to ready-filling code */
167     protected transient Semaphore readyFiller = new Semaphore(1);
168     
169     /*** 
170      * All 'inactive' queues, not yet in active rotation.
171      * Linked-list of keys for the queues.
172      */
173     protected Queue<String> inactiveQueues;
174 
175     /***
176      * 'retired' queues, no longer considered for activation.
177      * Linked-list of keys for queues.
178      */
179     protected Queue<String> retiredQueues;
180     
181     /*** all per-class queues from whom a URI is outstanding */
182     protected Bag inProcessQueues = 
183         BagUtils.synchronizedBag(new HashBag()); // of ClassKeyQueue
184     
185     /***
186      * All per-class queues held in snoozed state, sorted by wake time.
187      */
188     protected SortedSet<WorkQueue> snoozedClassQueues;
189     
190     /*** Timer for tasks which wake head item of snoozedClassQueues */
191     protected transient Timer wakeTimer;
192     
193     /*** Task for next wake */ 
194     protected transient WakeTask nextWake; 
195     
196     protected WorkQueue longestActiveQueue = null;
197     
198     /*** how long to wait for a ready queue when there's nothing snoozed */
199     private static final long DEFAULT_WAIT = 1000; // 1 second
200 
201     /*** a policy for assigning 'cost' values to CrawlURIs */
202     private transient CostAssignmentPolicy costAssignmentPolicy;
203     
204     /*** all policies available to be chosen */
205     String[] AVAILABLE_COST_POLICIES = new String[] {
206             ZeroCostAssignmentPolicy.class.getName(),
207             UnitCostAssignmentPolicy.class.getName(),
208             WagCostAssignmentPolicy.class.getName(),
209             AntiCalendarCostAssignmentPolicy.class.getName()};
210 
211     /***
212      * Create the CommonFrontier
213      * 
214      * @param name
215      * @param description
216      */
217     public WorkQueueFrontier(String name, String description) {
218         // The 'name' of all frontiers should be the same (URIFrontier.ATTR_NAME)
219         // therefore we'll ignore the supplied parameter.
220         super(Frontier.ATTR_NAME, description);
221         Type t = addElementToDefinition(new SimpleType(ATTR_HOLD_QUEUES,
222             "Whether to hold newly-created per-host URI work " +
223             "queues until needed to stay busy. If true (the default), " +
224             "queues begin (and collect URIs) in an 'inactive' state, and " +
225             "only when the Frontier needs another queue to keep all " +
226             "ToeThreads busy will new queues be activated. If false, all " +
227             "queues contribute URIs for crawling at all times in a " +
228             "round-robin fashion. (This mode is less likely to make " +
229             "effective use of database and IO caches.)", DEFAULT_HOLD_QUEUES));
230         t.setExpertSetting(true);
231         t.setOverrideable(false);
232         t = addElementToDefinition(new SimpleType(ATTR_BALANCE_REPLENISH_AMOUNT,
233             "Amount to replenish a queue's activity balance when it becomes " +
234             "active. Larger amounts mean more URIs will be tried from the " +
235             "queue before it is deactivated in favor of waiting queues. " +
236             "Default is 3000", DEFAULT_BALANCE_REPLENISH_AMOUNT));
237         t.setExpertSetting(true);
238         t.setOverrideable(true);
239         t = addElementToDefinition(new SimpleType(ATTR_ERROR_PENALTY_AMOUNT,
240                 "Amount to additionally penalize a queue when one of" +
241                 "its URIs fails completely. Accelerates deactivation or " +
242                 "full retirement of problem queues and unresponsive sites. " +
243                 "Default is 100", DEFAULT_ERROR_PENALTY_AMOUNT));
244         t.setExpertSetting(true);
245         t.setOverrideable(true);
246         t = addElementToDefinition(new SimpleType(ATTR_QUEUE_TOTAL_BUDGET,
247             "Total activity expenditure allowable to a single queue; queues " +
248             "over this expenditure will be 'retired' and crawled no more. " +
249             "Default of -1 means no ceiling on activity expenditures is " +
250             "enforced.", DEFAULT_QUEUE_TOTAL_BUDGET));
251         t.setExpertSetting(true);
252         t.setOverrideable(true);
253 
254         t = addElementToDefinition(new SimpleType(ATTR_COST_POLICY,
255                 "Policy for calculating the cost of each URI attempted. " +
256                 "The default UnitCostAssignmentPolicy considers the cost of " +
257                 "each URI to be '1'.", DEFAULT_COST_POLICY, AVAILABLE_COST_POLICIES));
258         t.setExpertSetting(true);
259         
260         t = addElementToDefinition(new SimpleType(ATTR_SNOOZE_DEACTIVATE_MS,
261                 "Threshold above which any 'snooze' delay will cause the " +
262                 "affected queue to go inactive, allowing other queues a " +
263                 "chance to rotate into active state. Typically set to be " +
264                 "longer than the politeness pauses between successful " +
265                 "fetches, but shorter than the connection-failed " +
266                 "'retry-delay-seconds'. (Default is 5 minutes.)", 
267                 DEFAULT_SNOOZE_DEACTIVATE_MS));
268         t.setExpertSetting(true);
269         t.setOverrideable(false);
270         t = addElementToDefinition(new SimpleType(ATTR_TARGET_READY_QUEUES_BACKLOG,
271                 "Target size for backlog of ready queues. This many queues " +
272                 "will be brought into 'ready' state even if a thread is " +
273                 "not waiting. Only has effect if 'hold-queues' is true. " +
274                 "Default is 50.", DEFAULT_TARGET_READY_QUEUES_BACKLOG));
275         t.setExpertSetting(true);
276         t.setOverrideable(false);
277     }
278 
279     /***
280      * Initializes the Frontier, given the supplied CrawlController.
281      *
282      * @see org.archive.crawler.framework.Frontier#initialize(org.archive.crawler.framework.CrawlController)
283      */
284     public void initialize(CrawlController c)
285             throws FatalConfigurationException, IOException {
286         // Call the super method. It sets up frontier journalling.
287         super.initialize(c);
288         this.controller = c;
289         
290         initQueuesOfQueues();
291         
292         this.targetSizeForReadyQueues = (Integer)getUncheckedAttribute(null,
293             ATTR_TARGET_READY_QUEUES_BACKLOG);
294         if (this.targetSizeForReadyQueues < 1) {
295             this.targetSizeForReadyQueues = 1;
296         }
297         this.wakeTimer = new Timer("waker for " + c.toString());
298         
299         try {
300             if (workQueueDataOnDisk()
301                     && getQueueAssignmentPolicy(null).maximumNumberOfKeys() >= 0
302                     && getQueueAssignmentPolicy(null).maximumNumberOfKeys() <= 
303                         MAX_QUEUES_TO_HOLD_ALLQUEUES_IN_MEMORY) {
304                 this.allQueues = new ObjectIdentityMemCache<WorkQueue>(701, .9f, 100);
305             } else {
306                 this.allQueues = c.getBigMap("allqueues", WorkQueue.class);
307                 if (logger.isLoggable(Level.FINE)) {
308                     Iterator<String> i = this.allQueues.keySet().iterator();
309                     try {
310                         for (; i.hasNext();) {
311                             logger.fine((String) i.next());
312                         }
313                     } finally {
314                         StoredIterator.close(i);
315                     }
316                 }
317             }
318             this.alreadyIncluded = createAlreadyIncluded();
319             initQueue();
320         } catch (IOException e) {
321             e.printStackTrace();
322             throw (FatalConfigurationException)
323                 new FatalConfigurationException(e.getMessage()).initCause(e);
324         } catch (Exception e) {
325             e.printStackTrace();
326             throw (FatalConfigurationException)
327                 new FatalConfigurationException(e.getMessage()).initCause(e);
328         }
329         
330         initCostPolicy();
331         
332         loadSeeds();
333     }
334     
335     /***
336      * Set up the various queues-of-queues used by the frontier. Override
337      * in implementing subclasses to reduce or eliminate risk of queues
338      * growing without bound. 
339      */
340     protected void initQueuesOfQueues() {
341         // small risk of OutOfMemoryError: if 'hold-queues' is false,
342         // readyClassQueues may grow in size without bound
343         readyClassQueues = new LinkedBlockingQueue<String>();
344         // risk of OutOfMemoryError: in large crawls, 
345         // inactiveQueues may grow in size without bound
346         inactiveQueues = new LinkedBlockingQueue<String>();
347         // risk of OutOfMemoryError: in large crawls with queue max-budgets, 
348         // inactiveQueues may grow in size without bound
349         retiredQueues = new LinkedBlockingQueue<String>();
350         // small risk of OutOfMemoryError: in large crawls with many 
351         // unresponsive queues, an unbounded number of snoozed queues 
352         // may exist
353         snoozedClassQueues = Collections.synchronizedSortedSet(new TreeSet<WorkQueue>());
354     }
355 
356     /***
357      * Set (or reset after configuration change) the cost policy in effect.
358      * 
359      * @throws FatalConfigurationException
360      */
361     private void initCostPolicy() throws FatalConfigurationException {
362         try {
363             costAssignmentPolicy = (CostAssignmentPolicy) Class.forName(
364                     (String) getUncheckedAttribute(null, ATTR_COST_POLICY))
365                     .newInstance();
366         } catch (Exception e) {
367             e.printStackTrace();
368             throw new FatalConfigurationException(e.getMessage());
369         }
370     }
371 
372     /* (non-Javadoc)
373      * @see org.archive.crawler.frontier.AbstractFrontier#crawlEnded(java.lang.String)
374      */
375     public void crawlEnded(String sExitMessage) {
376         // Cleanup.  CrawlJobs persist after crawl has finished so undo any
377         // references.
378         if (this.alreadyIncluded != null) {
379             this.alreadyIncluded.close();
380             this.alreadyIncluded = null;
381         }
382         
383         try {
384             closeQueue();
385         } catch (IOException e) {
386             // FIXME exception handling
387             e.printStackTrace();
388         }
389         this.wakeTimer.cancel();
390         
391         this.allQueues.close();
392         this.allQueues = null;
393         this.inProcessQueues = null;
394         this.readyClassQueues = null;
395         this.snoozedClassQueues = null;
396         this.inactiveQueues = null;
397         this.retiredQueues = null;
398         
399         this.costAssignmentPolicy = null;
400         
401         // Clearing controller is a problem. We get NPEs in #preNext.
402         super.crawlEnded(sExitMessage);
403         this.controller = null;
404     }
405 
406     /***
407      * Create a UriUniqFilter that will serve as record 
408      * of already seen URIs.
409      *
410      * @return A UURISet that will serve as a record of already seen URIs
411      * @throws IOException
412      */
413     protected abstract UriUniqFilter createAlreadyIncluded() throws IOException;
414 
415     /***
416      * Arrange for the given CandidateURI to be visited, if it is not
417      * already scheduled/completed.
418      *
419      * @see org.archive.crawler.framework.Frontier#schedule(org.archive.crawler.datamodel.CandidateURI)
420      */
421     public void schedule(CandidateURI caUri) {
422         // Canonicalization may set forceFetch flag.  See
423         // #canonicalization(CandidateURI) javadoc for circumstance.
424         String canon = canonicalize(caUri);
425         if (caUri.forceFetch()) {
426             alreadyIncluded.addForce(canon, caUri);
427         } else {
428             alreadyIncluded.add(canon, caUri);
429         }
430     }
431 
432     /***
433      * Accept the given CandidateURI for scheduling, as it has
434      * passed the alreadyIncluded filter. 
435      * 
436      * Choose a per-classKey queue and enqueue it. If this
437      * item has made an unready queue ready, place that 
438      * queue on the readyClassQueues queue. 
439      * @param caUri CandidateURI.
440      */
441     public void receive(CandidateURI caUri) {
442         CrawlURI curi = asCrawlUri(caUri);
443         applySpecialHandling(curi);
444         sendToQueue(curi);
445         // Update recovery log.
446         doJournalAdded(curi);
447     }
448 
449 	/* (non-Javadoc)
450 	 * @see org.archive.crawler.frontier.AbstractFrontier#asCrawlUri(org.archive.crawler.datamodel.CandidateURI)
451 	 */
452 	protected CrawlURI asCrawlUri(CandidateURI caUri) {
453 		CrawlURI curi = super.asCrawlUri(caUri);
454 		// force cost to be calculated, pre-insert
455 		getCost(curi);
456 		return curi;
457 	}
458 	
459     /***
460      * Send a CrawlURI to the appropriate subqueue.
461      * 
462      * @param curi
463      */
464     protected void sendToQueue(CrawlURI curi) {
465         WorkQueue wq = getQueueFor(curi);
466         synchronized (wq) {
467             wq.enqueue(this, curi);
468             if(!wq.isRetired()) {
469                 incrementQueuedUriCount();
470             }
471             if(!wq.isHeld()) {
472                 wq.setHeld();
473                 if(holdQueues() && readyClassQueues.size()>=targetSizeForReadyQueues()) {
474                     deactivateQueue(wq);
475                 } else {
476                     replenishSessionBalance(wq);
477                     readyQueue(wq);
478                 }
479             }
480             WorkQueue laq = longestActiveQueue;
481             if(!wq.isRetired()&&((laq==null) || wq.getCount() > laq.getCount())) {
482                 longestActiveQueue = wq; 
483             }
484         }
485     }
486 
487     /***
488      * Whether queues should start inactive (only becoming active when needed
489      * to keep the crawler busy), or if queues should start out ready.
490      * 
491      * @return true if new queues should held inactive
492      */
493     private boolean holdQueues() {
494         return ((Boolean) getUncheckedAttribute(null, ATTR_HOLD_QUEUES))
495                 .booleanValue();
496     }
497 
498     /***
499      * Put the given queue on the readyClassQueues queue
500      * @param wq
501      */
502     private void readyQueue(WorkQueue wq) {
503         try {
504             wq.setActive(this, true);
505             readyClassQueues.put(wq.getClassKey());
506         } catch (InterruptedException e) {
507             e.printStackTrace();
508             System.err.println("unable to ready queue "+wq);
509             // propagate interrupt up 
510             throw new RuntimeException(e);
511         }
512     }
513 
514     /***
515      * Put the given queue on the inactiveQueues queue
516      * @param wq
517      */
518     private void deactivateQueue(WorkQueue wq) {
519 //        try {
520             wq.setSessionBalance(0); // zero out session balance
521             inactiveQueues.add(wq.getClassKey());
522             wq.setActive(this, false);
523 //        } catch (InterruptedException e) {
524 //            e.printStackTrace();
525 //            System.err.println("unable to deactivate queue "+wq);
526 //            // propagate interrupt up 
527 //            throw new RuntimeException(e);
528 //        }
529     }
530     
531     /***
532      * Put the given queue on the retiredQueues queue
533      * @param wq
534      */
535     private void retireQueue(WorkQueue wq) {
536 //        try {
537             retiredQueues.add(wq.getClassKey());
538             decrementQueuedCount(wq.getCount());
539             wq.setRetired(true);
540             wq.setActive(this, false);
541 //        } catch (InterruptedException e) {
542 //            e.printStackTrace();
543 //            System.err.println("unable to retire queue "+wq);
544 //            // propagate interrupt up 
545 //            throw new RuntimeException(e);
546 //        }
547     }
548     
549     /*** 
550      * Accomodate any changes in settings.
551      * 
552      * @see org.archive.crawler.framework.Frontier#kickUpdate()
553      */
554     public void kickUpdate() {
555         super.kickUpdate();
556         int target = (Integer)getUncheckedAttribute(null,
557                 ATTR_TARGET_READY_QUEUES_BACKLOG);
558         if (target < 1) {
559             target = 1;
560         }
561         this.targetSizeForReadyQueues = target; 
562         try {
563             initCostPolicy();
564         } catch (FatalConfigurationException fce) {
565             throw new RuntimeException(fce);
566         }
567         // The rules for a 'retired' queue may have changed; so,
568         // unretire all queues to 'inactive'. If they still qualify
569         // as retired/overbudget next time they come up, they'll
570         // be re-retired; if not, they'll get a chance to become
571         // active under the new rules.
572         Object key = this.retiredQueues.poll();
573         while (key != null) {
574             WorkQueue q = (WorkQueue)this.allQueues.get((String)key);
575             if(q != null) {
576                 unretireQueue(q);
577             }
578             key = this.retiredQueues.poll();
579         }
580     }
581     /***
582      * Restore a retired queue to the 'inactive' state. 
583      * 
584      * @param q
585      */
586     private void unretireQueue(WorkQueue q) {
587         deactivateQueue(q);
588         q.setRetired(false); 
589         incrementQueuedUriCount(q.getCount());
590     }
591 
592     /***
593      * Return the work queue for the given CrawlURI's classKey. URIs
594      * are ordered and politeness-delayed within their 'class'.
595      * If the requested queue is not found, a new instance is created.
596      * 
597      * @param curi CrawlURI to base queue on
598      * @return the found or created ClassKeyQueue
599      */
600     protected abstract WorkQueue getQueueFor(CrawlURI curi);
601 
602     /***
603      * Return the work queue for the given classKey, or null
604      * if no such queue exists.
605      * 
606      * @param classKey key to look for
607      * @return the found WorkQueue
608      */
609     protected abstract WorkQueue getQueueFor(String classKey);
610     
611     /***
612      * Return the next CrawlURI to be processed (and presumably
613      * visited/fetched) by a a worker thread.
614      *
615      * Relies on the readyClassQueues having been loaded with
616      * any work queues that are eligible to provide a URI. 
617      *
618      * @return next CrawlURI to be processed. Or null if none is available.
619      *
620      * @see org.archive.crawler.framework.Frontier#next()
621      */
622     public CrawlURI next()
623     throws InterruptedException, EndedException {
624         while (true) { // loop left only by explicit return or exception
625             long now = System.currentTimeMillis();
626 
627             // Do common checks for pause, terminate, bandwidth-hold
628             preNext(now);
629             
630             // allow up-to-1 thread to fill readyClassQueues to target
631             if(readyFiller.tryAcquire()) {
632                 try {
633                     int activationsNeeded = targetSizeForReadyQueues() - readyClassQueues.size();
634                     while(activationsNeeded > 0 && !inactiveQueues.isEmpty()) {
635                         activateInactiveQueue();
636                         activationsNeeded--;
637                     }
638                 } finally {
639                     readyFiller.release();
640                 }
641             }
642                    
643             WorkQueue readyQ = null;
644             Object key = readyClassQueues.poll(DEFAULT_WAIT,TimeUnit.MILLISECONDS);
645             if (key != null) {
646                 readyQ = (WorkQueue)this.allQueues.get((String)key);
647             }
648             if (readyQ != null) {
649                 while(true) { // loop left by explicit return or break on empty
650                     CrawlURI curi = null;
651                     synchronized(readyQ) {
652                         curi = readyQ.peek(this);                     
653                         if (curi != null) {
654                             // check if curi belongs in different queue
655                             String currentQueueKey = getClassKey(curi);
656                             if (currentQueueKey.equals(curi.getClassKey())) {
657                                 // curi was in right queue, emit
658                                 noteAboutToEmit(curi, readyQ);
659                                 inProcessQueues.add(readyQ);
660                                 return curi;
661                             }
662                             // URI's assigned queue has changed since it
663                             // was queued (eg because its IP has become
664                             // known). Requeue to new queue.
665                             curi.setClassKey(currentQueueKey);
666                             readyQ.dequeue(this);
667                             decrementQueuedCount(1);
668                             curi.setHolderKey(null);
669                             // curi will be requeued to true queue after lock
670                             //  on readyQ is released, to prevent deadlock
671                         } else {
672                             // readyQ is empty and ready: it's exhausted
673                             // release held status, allowing any subsequent 
674                             // enqueues to again put queue in ready
675                             readyQ.clearHeld();
676                             break;
677                         }
678                     }
679                     if(curi!=null) {
680                         // complete the requeuing begun earlier
681                         sendToQueue(curi);
682                     }
683                 }
684             } else {
685                 // ReadyQ key wasn't in all queues: unexpected
686                 if (key != null) {
687                     logger.severe("Key "+ key +
688                         " in readyClassQueues but not allQueues");
689                 }
690             }
691 
692             if(shouldTerminate) {
693                 // skip subsequent steps if already on last legs
694                 throw new EndedException("shouldTerminate is true");
695             }
696                 
697             if(inProcessQueues.size()==0) {
698                 // Nothing was ready or in progress or imminent to wake; ensure 
699                 // any piled-up pending-scheduled URIs are considered
700                 this.alreadyIncluded.requestFlush();
701             }    
702         }
703     }
704 
705     private int targetSizeForReadyQueues() {
706         return targetSizeForReadyQueues;
707     }
708 
709     /***
710      * Return the 'cost' of a CrawlURI (how much of its associated
711      * queue's budget it depletes upon attempted processing)
712      * 
713      * @param curi
714      * @return the associated cost
715      */
716     private int getCost(CrawlURI curi) {
717         int cost = curi.getHolderCost();
718         if (cost == CrawlURI.UNCALCULATED) {
719             cost = costAssignmentPolicy.costOf(curi);
720             curi.setHolderCost(cost);
721         }
722         return cost;
723     }
724     
725     /***
726      * Activate an inactive queue, if any are available. 
727      */
728     private void activateInactiveQueue() {
729         Object key = this.inactiveQueues.poll();
730         if (key == null) {
731             return;
732         }
733         WorkQueue candidateQ = (WorkQueue)this.allQueues.get((String)key);
734         if(candidateQ != null) {
735             synchronized(candidateQ) {
736                 replenishSessionBalance(candidateQ);
737                 if(candidateQ.isOverBudget()){
738                     // if still over-budget after an activation & replenishing,
739                     // retire
740                     retireQueue(candidateQ);
741                     return;
742                 } 
743                 long now = System.currentTimeMillis();
744                 long delay_ms = candidateQ.getWakeTime() - now;
745                 if(delay_ms>0) {
746                     // queue still due for snoozing
747                     snoozeQueue(candidateQ,now,delay_ms);
748                     return;
749                 }
750                 candidateQ.setWakeTime(0); // clear obsolete wake time, if any
751                 readyQueue(candidateQ);
752                 if (logger.isLoggable(Level.FINE)) {
753                     logger.fine("ACTIVATED queue: " +
754                         candidateQ.getClassKey());
755                    
756                 }
757             }
758         }
759     }
760 
761     /***
762      * Replenish the budget of the given queue by the appropriate amount.
763      * 
764      * @param queue queue to replenish
765      */
766     private void replenishSessionBalance(WorkQueue queue) {
767         UURI contextUri = queue.getContextUURI(this); 
768         
769         // TODO: consider confusing cross-effects of this and IP-based politeness
770         queue.setSessionBalance(((Integer) getUncheckedAttribute(contextUri,
771                 ATTR_BALANCE_REPLENISH_AMOUNT)).intValue());
772         // reset total budget (it may have changed)
773         // TODO: is this the best way to be sensitive to potential mid-crawl changes
774         long totalBudget = ((Long)getUncheckedAttribute(contextUri,ATTR_QUEUE_TOTAL_BUDGET)).longValue();
775         queue.setTotalBudget(totalBudget);
776     }
777 
778     /***
779      * Enqueue the given queue to either readyClassQueues or inactiveQueues,
780      * as appropriate.
781      * 
782      * @param wq
783      */
784     private void reenqueueQueue(WorkQueue wq) {
785         if(wq.isOverBudget()) {
786             // if still over budget, deactivate
787             if (logger.isLoggable(Level.FINE)) {
788                 logger.fine("DEACTIVATED queue: " +
789                     wq.getClassKey());
790             }
791             deactivateQueue(wq);
792         } else {
793             readyQueue(wq);
794         }
795     }
796     
797     /***
798      * Wake any queues sitting in the snoozed queue whose time has come.
799      */
800     void wakeQueues() {
801         long now = System.currentTimeMillis();
802         wakeQueuesAsIfAtTime(now);
803     }
804 
805     /***
806      * Wake any queues sitting in the snoozed queue whose time has come.
807      */
808     void wakeQueuesAsIfAtTime(long nowish) {
809         synchronized (snoozedClassQueues) {
810             long nextWakeDelay = 0;
811             int wokenQueuesCount = 0;
812             while (true) {
813                 if (snoozedClassQueues.isEmpty()) {
814                     return;
815                 }
816                 WorkQueue peek = (WorkQueue) snoozedClassQueues.first();
817                 nextWakeDelay = peek.getWakeTime() - nowish;
818                 if (nextWakeDelay <= 0) {
819                     snoozedClassQueues.remove(peek);
820                     peek.setWakeTime(0);
821                     reenqueueQueue(peek);
822                     wokenQueuesCount++;
823                 } else {
824                     break;
825                 }
826             }
827             this.nextWake = new WakeTask();
828             this.wakeTimer.schedule(nextWake,nextWakeDelay);
829         }
830     }
831     
832     /***
833      * Wake all queues as if we were at the end of time
834      */
835     public void forceWakeQueues() {
836         wakeQueuesAsIfAtTime(Long.MAX_VALUE);
837     }
838     
839     /***
840      * Note that the previously emitted CrawlURI has completed
841      * its processing (for now).
842      *
843      * The CrawlURI may be scheduled to retry, if appropriate,
844      * and other related URIs may become eligible for release
845      * via the next next() call, as a result of finished().
846      *
847      *  (non-Javadoc)
848      * @see org.archive.crawler.framework.Frontier#finished(org.archive.crawler.datamodel.CrawlURI)
849      */
850     public void finished(CrawlURI curi) {
851         long now = System.currentTimeMillis();
852 
853         curi.incrementFetchAttempts();
854         logLocalizedErrors(curi);
855         WorkQueue wq = (WorkQueue) curi.getHolder();
856         assert (wq.peek(this) == curi) : "unexpected peek " + wq;
857         inProcessQueues.remove(wq, 1);
858 
859         if(includesRetireDirective(curi)) {
860             // CrawlURI is marked to trigger retirement of its queue
861             curi.processingCleanup();
862             wq.unpeek();
863             wq.update(this, curi); // rewrite any changes
864             retireQueue(wq);
865             return;
866         }
867         
868         if (needsRetrying(curi)) {
869             // Consider errors which can be retried, leaving uri atop queue
870             if(curi.getFetchStatus()!=S_DEFERRED) {
871                 wq.expend(getCost(curi)); // all retries but DEFERRED cost
872             }
873             long delay_sec = retryDelayFor(curi);
874             curi.processingCleanup(); // lose state that shouldn't burden retry
875             synchronized(wq) {
876                 wq.unpeek();
877                 // TODO: consider if this should happen automatically inside unpeek()
878                 wq.update(this, curi); // rewrite any changes
879                 if (delay_sec > 0) {
880                     long delay_ms = delay_sec * 1000;
881                     snoozeQueue(wq, now, delay_ms);
882                 } else {
883                     reenqueueQueue(wq);
884                 }
885             }
886             // Let everyone interested know that it will be retried.
887             controller.fireCrawledURINeedRetryEvent(curi);
888             doJournalRescheduled(curi);
889             return;
890         }
891 
892         // Curi will definitely be disposed of without retry, so remove from queue
893         wq.dequeue(this);
894         decrementQueuedCount(1);
895         log(curi);
896 
897         if (curi.isSuccess()) {
898             totalProcessedBytes += curi.getRecordedSize();
899             incrementSucceededFetchCount();
900             // Let everyone know in case they want to do something before we strip the curi.
901             controller.fireCrawledURISuccessfulEvent(curi);
902             doJournalFinishedSuccess(curi);
903             wq.expend(getCost(curi)); // successes cost
904         } else if (isDisregarded(curi)) {
905             // Check for codes that mean that while we the crawler did
906             // manage to schedule it, it must be disregarded for some reason.
907             incrementDisregardedUriCount();
908             // Let interested listeners know of disregard disposition.
909             controller.fireCrawledURIDisregardEvent(curi);
910             doJournalDisregarded(curi);
911             // if exception, also send to crawlErrors
912             if (curi.getFetchStatus() == S_RUNTIME_EXCEPTION) {
913                 Object[] array = { curi };
914                 controller.runtimeErrors.log(Level.WARNING, curi.getUURI()
915                         .toString(), array);
916             }
917             // TODO: consider reinstating forget-uri
918         } else {
919             // In that case FAILURE, note & log
920             //Let interested listeners know of failed disposition.
921             this.controller.fireCrawledURIFailureEvent(curi);
922             // if exception, also send to crawlErrors
923             if (curi.getFetchStatus() == S_RUNTIME_EXCEPTION) {
924                 Object[] array = { curi };
925                 this.controller.runtimeErrors.log(Level.WARNING, curi.getUURI()
926                         .toString(), array);
927             }
928             incrementFailedFetchCount();
929             // let queue note error
930             wq.noteError(((Integer) getUncheckedAttribute(curi,
931                     ATTR_ERROR_PENALTY_AMOUNT)).intValue()); 
932             doJournalFinishedFailure(curi);
933             wq.expend(getCost(curi)); // failures cost
934         }
935 
936         long delay_ms = politenessDelayFor(curi);
937         synchronized(wq) {
938             if (delay_ms > 0) {
939                 snoozeQueue(wq,now,delay_ms);
940             } else {
941                 reenqueueQueue(wq);
942             }
943         }
944 
945         curi.stripToMinimal();
946         curi.processingCleanup();
947 
948     }
949 
950     private boolean includesRetireDirective(CrawlURI curi) {
951         return curi.containsKey(A_FORCE_RETIRE) && (Boolean)curi.getObject(A_FORCE_RETIRE);
952     }
953 
954     /***
955      * Place the given queue into 'snoozed' state, ineligible to
956      * supply any URIs for crawling, for the given amount of time. 
957      * 
958      * @param wq queue to snooze 
959      * @param now time now in ms 
960      * @param delay_ms time to snooze in ms
961      */
962     private void snoozeQueue(WorkQueue wq, long now, long delay_ms) {
963         long nextTime = now + delay_ms;
964         wq.setWakeTime(nextTime);
965         long snoozeToInactiveDelayMs = ((Long)getUncheckedAttribute(null,
966                 ATTR_SNOOZE_DEACTIVATE_MS)).longValue();
967         if (delay_ms > snoozeToInactiveDelayMs && !inactiveQueues.isEmpty()) {
968             deactivateQueue(wq);
969         } else {
970             synchronized(snoozedClassQueues) {
971                 snoozedClassQueues.add(wq);
972                 if(wq == snoozedClassQueues.first()) {
973                     this.nextWake = new WakeTask();
974                     this.wakeTimer.schedule(nextWake, delay_ms);
975                 }
976             }
977         }
978     }
979 
980     /***
981      * Forget the given CrawlURI. This allows a new instance
982      * to be created in the future, if it is reencountered under
983      * different circumstances.
984      *
985      * @param curi The CrawlURI to forget
986      */
987     protected void forget(CrawlURI curi) {
988         logger.finer("Forgetting " + curi);
989         alreadyIncluded.forget(canonicalize(curi.getUURI()), curi);
990     }
991 
992     /***  (non-Javadoc)
993      * @see org.archive.crawler.framework.Frontier#discoveredUriCount()
994      */
995     public long discoveredUriCount() {
996         return (this.alreadyIncluded != null)? this.alreadyIncluded.count(): 0;
997     }
998 
999     /***
1000      * Delete all scheduled URIs matching the given regex. 
1001      * 
1002      * @param match regex of URIs to delete
1003      * @return Number of items deleted.
1004      */
1005     public long deleteURIs(String uriMatch) {
1006         return deleteURIs(uriMatch,null);
1007     }
1008 
1009     /***
1010      * Delete all scheduled URIs matching the given regex, in queues with
1011      * names matching the second given regex. 
1012      * 
1013      * @param uriMatch regex of URIs to delete
1014      * @param queueMatch regex of queues to affect, or null for all
1015      * @return Number of items deleted.
1016      */
1017     public long deleteURIs(String uriMatch, String queueMatch) {
1018         long count = 0;
1019         // TODO: DANGER/ values() may not work right from CachedBdbMap
1020         Iterator<String> iter = allQueues.keySet().iterator(); 
1021         while(iter.hasNext()) {
1022             String queueKey = ((String)iter.next());
1023             if(StringUtils.isNotEmpty(queueMatch) && !queueKey.matches(queueMatch)) {
1024                 // skip this queue
1025                 continue; 
1026             }
1027             WorkQueue wq = getQueueFor(queueKey);
1028             wq.unpeek();
1029             count += wq.deleteMatching(this, uriMatch);
1030         }
1031         decrementQueuedCount(count);
1032         return count;
1033     }
1034     
1035     //
1036     // Reporter implementation
1037     //
1038     
1039     public static String STANDARD_REPORT = "standard";
1040     public static String ALL_NONEMPTY = "nonempty";
1041     public static String ALL_QUEUES = "all";
1042     protected static String[] REPORTS = {STANDARD_REPORT,ALL_NONEMPTY,ALL_QUEUES};
1043     
1044     public String[] getReports() {
1045         return REPORTS;
1046     }
1047     
1048     /***
1049      * @param w Where to write to.
1050      */
1051     public void singleLineReportTo(PrintWriter w) {
1052         if (this.allQueues == null) {
1053             return;
1054         }
1055         int allCount = allQueues.size();
1056         int inProcessCount = inProcessQueues.uniqueSet().size();
1057         int readyCount = readyClassQueues.size();
1058         int snoozedCount = snoozedClassQueues.size();
1059         int activeCount = inProcessCount + readyCount + snoozedCount;
1060         int inactiveCount = inactiveQueues.size();
1061         int retiredCount = retiredQueues.size();
1062         int exhaustedCount = 
1063             allCount - activeCount - inactiveCount - retiredCount;
1064         w.print(allCount);
1065         w.print(" queues: ");
1066         w.print(activeCount);
1067         w.print(" active (");
1068         w.print(inProcessCount);
1069         w.print(" in-process; ");
1070         w.print(readyCount);
1071         w.print(" ready; ");
1072         w.print(snoozedCount);
1073         w.print(" snoozed); ");
1074         w.print(inactiveCount);
1075         w.print(" inactive; ");
1076         w.print(retiredCount);
1077         w.print(" retired; ");
1078         w.print(exhaustedCount);
1079         w.print(" exhausted");
1080         w.flush();
1081     }
1082     
1083     /* (non-Javadoc)
1084      * @see org.archive.util.Reporter#singleLineLegend()
1085      */
1086     public String singleLineLegend() {
1087         return "total active in-process ready snoozed inactive retired exhausted";
1088     }
1089 
1090     /***
1091      * This method compiles a human readable report on the status of the frontier
1092      * at the time of the call.
1093      * @param name Name of report.
1094      * @param writer Where to write to.
1095      */
1096     public synchronized void reportTo(String name, PrintWriter writer) {
1097         if(ALL_NONEMPTY.equals(name)) {
1098             allNonemptyReportTo(writer);
1099             return;
1100         }
1101         if(ALL_QUEUES.equals(name)) {
1102             allQueuesReportTo(writer);
1103             return;
1104         }
1105         if(name!=null && !STANDARD_REPORT.equals(name)) {
1106             writer.print(name);
1107             writer.print(" unavailable; standard report:\n");
1108         }
1109         standardReportTo(writer);
1110     }   
1111     
1112     /*** Compact report of all nonempty queues (one queue per line)
1113      * 
1114      * @param writer
1115      */
1116     private void allNonemptyReportTo(PrintWriter writer) {
1117         ArrayList<WorkQueue> inProcessQueuesCopy;
1118         synchronized(this.inProcessQueues) {
1119             // grab a copy that will be stable against mods for report duration 
1120             @SuppressWarnings("unchecked")
1121             Collection<WorkQueue> inProcess = this.inProcessQueues;
1122             inProcessQueuesCopy = new ArrayList<WorkQueue>(inProcess);
1123         }
1124         writer.print("\n -----===== IN-PROCESS QUEUES =====-----\n");
1125         queueSingleLinesTo(writer, inProcessQueuesCopy.iterator());
1126 
1127         writer.print("\n -----===== READY QUEUES =====-----\n");
1128         queueSingleLinesTo(writer, this.readyClassQueues.iterator());
1129 
1130         writer.print("\n -----===== SNOOZED QUEUES =====-----\n");
1131         queueSingleLinesTo(writer, this.snoozedClassQueues.iterator());
1132         
1133         writer.print("\n -----===== INACTIVE QUEUES =====-----\n");
1134         queueSingleLinesTo(writer, this.inactiveQueues.iterator());
1135         
1136         writer.print("\n -----===== RETIRED QUEUES =====-----\n");
1137         queueSingleLinesTo(writer, this.retiredQueues.iterator());
1138     }
1139 
1140     /*** Compact report of all nonempty queues (one queue per line)
1141      * 
1142      * @param writer
1143      */
1144     private void allQueuesReportTo(PrintWriter writer) {
1145         queueSingleLinesTo(writer, allQueues.keySet().iterator());
1146     }
1147     
1148     /***
1149      * Write the single-line reports of all queues in the
1150      * iterator to the writer 
1151      * 
1152      * @param writer to receive report
1153      * @param iterator over queues of interest.
1154      */
1155     private void queueSingleLinesTo(PrintWriter writer, Iterator<?> iterator) {
1156         Object obj;
1157         WorkQueue q;
1158         boolean legendWritten = false;
1159         while( iterator.hasNext()) {
1160             obj = iterator.next();
1161             if (obj ==  null) {
1162                 continue;
1163             }
1164             q = (obj instanceof WorkQueue)?
1165                 (WorkQueue)obj:
1166                 (WorkQueue)this.allQueues.get((String)obj);
1167             if(q == null) {
1168                 writer.print(" ERROR: "+obj);
1169             }
1170             if(!legendWritten) {
1171                 writer.println(q.singleLineLegend());
1172                 legendWritten = true;
1173             }
1174             q.singleLineReportTo(writer);
1175         }       
1176     }
1177 
1178     /***
1179      * @param w Writer to print to.
1180      */
1181     private void standardReportTo(PrintWriter w) {
1182         int allCount = allQueues.size();
1183         int inProcessCount = inProcessQueues.uniqueSet().size();
1184         int readyCount = readyClassQueues.size();
1185         int snoozedCount = snoozedClassQueues.size();
1186         int activeCount = inProcessCount + readyCount + snoozedCount;
1187         int inactiveCount = inactiveQueues.size();
1188         int retiredCount = retiredQueues.size();
1189         int exhaustedCount = 
1190             allCount - activeCount - inactiveCount - retiredCount;
1191 
1192         w.print("Frontier report - ");
1193         w.print(ArchiveUtils.get12DigitDate());
1194         w.print("\n");
1195         w.print(" Job being crawled: ");
1196         w.print(controller.getOrder().getCrawlOrderName());
1197         w.print("\n");
1198         w.print("\n -----===== STATS =====-----\n");
1199         w.print(" Discovered:    ");
1200         w.print(Long.toString(discoveredUriCount()));
1201         w.print("\n");
1202         w.print(" Queued:        ");
1203         w.print(Long.toString(queuedUriCount()));
1204         w.print("\n");
1205         w.print(" Finished:      ");
1206         w.print(Long.toString(finishedUriCount()));
1207         w.print("\n");
1208         w.print("  Successfully: ");
1209         w.print(Long.toString(succeededFetchCount()));
1210         w.print("\n");
1211         w.print("  Failed:       ");
1212         w.print(Long.toString(failedFetchCount()));
1213         w.print("\n");
1214         w.print("  Disregarded:  ");
1215         w.print(Long.toString(disregardedUriCount()));
1216         w.print("\n");
1217         w.print("\n -----===== QUEUES =====-----\n");
1218         w.print(" Already included size:     ");
1219         w.print(Long.toString(alreadyIncluded.count()));
1220         w.print("\n");
1221         w.print("               pending:     ");
1222         w.print(Long.toString(alreadyIncluded.pending()));
1223         w.print("\n");
1224         w.print("\n All class queues map size: ");
1225         w.print(Long.toString(allCount));
1226         w.print("\n");
1227         w.print( "             Active queues: ");
1228         w.print(activeCount);
1229         w.print("\n");
1230         w.print("                    In-process: ");
1231         w.print(inProcessCount);
1232         w.print("\n");
1233         w.print("                         Ready: ");
1234         w.print(readyCount);
1235         w.print("\n");
1236         w.print("                       Snoozed: ");
1237         w.print(snoozedCount);
1238         w.print("\n");
1239         w.print("           Inactive queues: ");
1240         w.print(inactiveCount);
1241         w.print("\n");
1242         w.print("            Retired queues: ");
1243         w.print(retiredCount);
1244         w.print("\n");
1245         w.print("          Exhausted queues: ");
1246         w.print(exhaustedCount);
1247         w.print("\n");
1248         
1249         w.print("\n -----===== IN-PROCESS QUEUES =====-----\n");
1250         @SuppressWarnings("unchecked")
1251         Collection<WorkQueue> inProcess = inProcessQueues;
1252         ArrayList<WorkQueue> copy = extractSome(inProcess, REPORT_MAX_QUEUES);
1253         appendQueueReports(w, copy.iterator(), copy.size(), REPORT_MAX_QUEUES);
1254         
1255         w.print("\n -----===== READY QUEUES =====-----\n");
1256         appendQueueReports(w, this.readyClassQueues.iterator(),
1257             this.readyClassQueues.size(), REPORT_MAX_QUEUES);
1258 
1259         w.print("\n -----===== SNOOZED QUEUES =====-----\n");
1260         copy = extractSome(snoozedClassQueues, REPORT_MAX_QUEUES);
1261         appendQueueReports(w, copy.iterator(), copy.size(), REPORT_MAX_QUEUES);
1262         
1263         WorkQueue longest = longestActiveQueue;
1264         if (longest != null) {
1265             w.print("\n -----===== LONGEST QUEUE =====-----\n");
1266             longest.reportTo(w);
1267         }
1268 
1269         w.print("\n -----===== INACTIVE QUEUES =====-----\n");
1270         appendQueueReports(w, this.inactiveQueues.iterator(),
1271             this.inactiveQueues.size(), REPORT_MAX_QUEUES);
1272         
1273         w.print("\n -----===== RETIRED QUEUES =====-----\n");
1274         appendQueueReports(w, this.retiredQueues.iterator(),
1275             this.retiredQueues.size(), REPORT_MAX_QUEUES);
1276 
1277         w.flush();
1278     }
1279     
1280     
1281     /***
1282      * Extract some of the elements in the given collection to an
1283      * ArrayList.  This method synchronizes on the given collection's
1284      * monitor.  The returned list will never contain more than the
1285      * specified maximum number of elements.
1286      * 
1287      * @param c    the collection whose elements to extract
1288      * @param max  the maximum number of elements to extract
1289      * @return  the extraction
1290      */
1291     private static <T> ArrayList<T> extractSome(Collection<T> c, int max) {
1292         // Try to guess a sane initial capacity for ArrayList
1293         // Hopefully given collection won't grow more than 10 items
1294         // between now and the synchronized block...
1295         int initial = Math.min(c.size() + 10, max);
1296         int count = 0;
1297         ArrayList<T> list = new ArrayList<T>(initial);
1298         synchronized (c) {
1299             Iterator<T> iter = c.iterator();
1300             while (iter.hasNext() && (count < max)) {
1301                 list.add(iter.next());
1302                 count++;
1303             }
1304         }
1305         return list;
1306     }
1307 
1308     /***
1309      * Append queue report to general Frontier report.
1310      * @param w StringBuffer to append to.
1311      * @param iterator An iterator over 
1312      * @param total
1313      * @param max
1314      */
1315     protected void appendQueueReports(PrintWriter w, Iterator<?> iterator,
1316             int total, int max) {
1317         Object obj;
1318         WorkQueue q;
1319         for(int count = 0; iterator.hasNext() && (count < max); count++) {
1320             obj = iterator.next();
1321             if (obj ==  null) {
1322                 continue;
1323             }
1324             q = (obj instanceof WorkQueue)?
1325                 (WorkQueue)obj:
1326                 (WorkQueue)this.allQueues.get((String)obj);
1327             if(q == null) {
1328                 w.print("WARNING: No report for queue "+obj);
1329             }
1330             q.reportTo(w);
1331         }
1332         if(total > max) {
1333             w.print("...and " + (total - max) + " more.\n");
1334         }
1335     }
1336 
1337     /***
1338      * Force logging, etc. of operator- deleted CrawlURIs
1339      * 
1340      * @see org.archive.crawler.framework.Frontier#deleted(org.archive.crawler.datamodel.CrawlURI)
1341      */
1342     public synchronized void deleted(CrawlURI curi) {
1343         //treat as disregarded
1344         controller.fireCrawledURIDisregardEvent(curi);
1345         log(curi);
1346         incrementDisregardedUriCount();
1347         curi.stripToMinimal();
1348         curi.processingCleanup();
1349     }
1350 
1351     public void considerIncluded(UURI u) {
1352         this.alreadyIncluded.note(canonicalize(u));
1353         CrawlURI temp = new CrawlURI(u);
1354         temp.setClassKey(getClassKey(temp));
1355         getQueueFor(temp).expend(getCost(temp));
1356     }
1357     
1358     protected abstract void initQueue() throws IOException;
1359     protected abstract void closeQueue() throws IOException;
1360     
1361     /***
1362      * Returns <code>true</code> if the WorkQueue implementation of this
1363      * Frontier stores its workload on disk instead of relying
1364      * on serialization mechanisms.
1365      * 
1366      * TODO: rename! (this is a very misleading name) or kill (don't
1367      * see any implementations that return false)
1368      * 
1369      * @return a constant boolean value for this class/instance
1370      */
1371     protected abstract boolean workQueueDataOnDisk();
1372     
1373     
1374     public FrontierGroup getGroup(CrawlURI curi) {
1375         return getQueueFor(curi);
1376     }
1377     
1378     
1379     public long averageDepth() {
1380         int inProcessCount = inProcessQueues.uniqueSet().size();
1381         int readyCount = readyClassQueues.size();
1382         int snoozedCount = snoozedClassQueues.size();
1383         int activeCount = inProcessCount + readyCount + snoozedCount;
1384         int inactiveCount = inactiveQueues.size();
1385         int totalQueueCount = (activeCount+inactiveCount);
1386         return (totalQueueCount == 0) ? 0 : liveQueuedUriCount.get() / totalQueueCount;
1387     }
1388     public float congestionRatio() {
1389         int inProcessCount = inProcessQueues.uniqueSet().size();
1390         int readyCount = readyClassQueues.size();
1391         int snoozedCount = snoozedClassQueues.size();
1392         int activeCount = inProcessCount + readyCount + snoozedCount;
1393         int inactiveCount = inactiveQueues.size();
1394         return (float)(activeCount + inactiveCount) / (inProcessCount + snoozedCount);
1395     }
1396     public long deepestUri() {
1397         return longestActiveQueue==null ? -1 : longestActiveQueue.getCount();
1398     }
1399     
1400     
1401     /* (non-Javadoc)
1402      * @see org.archive.crawler.framework.Frontier#isEmpty()
1403      */
1404     public synchronized boolean isEmpty() {
1405         return liveQueuedUriCount.get() == 0 && alreadyIncluded.pending() == 0;
1406     }
1407 }
1408