View Javadoc

1   /* AbstractFrontier
2    *
3    * $Id: AbstractFrontier.java 6704 2009-11-25 01:38:55Z gojomo $
4    *
5    * Created on Aug 17, 2004
6    *
7    * Copyright (C) 2004 Internet Archive.
8    *
9    * This file is part of the Heritrix web crawler (crawler.archive.org).
10   *
11   * Heritrix is free software; you can redistribute it and/or modify
12   * it under the terms of the GNU Lesser Public License as published by
13   * the Free Software Foundation; either version 2.1 of the License, or
14   * any later version.
15   *
16   * Heritrix is distributed in the hope that it will be useful,
17   * but WITHOUT ANY WARRANTY; without even the implied warranty of
18   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19   * GNU Lesser Public License for more details.
20   *
21   * You should have received a copy of the GNU Lesser Public License
22   * along with Heritrix; if not, write to the Free Software
23   * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
24   */
25  package org.archive.crawler.frontier;
26  
27  import java.io.BufferedWriter;
28  import java.io.File;
29  import java.io.FileOutputStream;
30  import java.io.IOException;
31  import java.io.OutputStreamWriter;
32  import java.io.PrintWriter;
33  import java.io.Serializable;
34  import java.io.StringWriter;
35  import java.io.Writer;
36  import java.util.Iterator;
37  import java.util.List;
38  import java.util.concurrent.atomic.AtomicLong;
39  import java.util.logging.Level;
40  import java.util.logging.Logger;
41  import java.util.regex.Pattern;
42  
43  import javax.management.AttributeNotFoundException;
44  
45  import org.apache.commons.httpclient.HttpStatus;
46  import org.archive.crawler.datamodel.CandidateURI;
47  import org.archive.crawler.datamodel.CoreAttributeConstants;
48  import org.archive.crawler.datamodel.CrawlHost;
49  import org.archive.crawler.datamodel.CrawlOrder;
50  import org.archive.crawler.datamodel.CrawlServer;
51  import org.archive.crawler.datamodel.CrawlSubstats;
52  import org.archive.crawler.datamodel.CrawlURI;
53  import org.archive.crawler.datamodel.FetchStatusCodes;
54  import org.archive.crawler.datamodel.RobotsExclusionPolicy;
55  import org.archive.crawler.datamodel.CrawlSubstats.Stage;
56  import org.archive.crawler.event.CrawlStatusListener;
57  import org.archive.crawler.framework.CrawlController;
58  import org.archive.crawler.framework.Frontier;
59  import org.archive.crawler.framework.ToeThread;
60  import org.archive.crawler.framework.exceptions.EndedException;
61  import org.archive.crawler.framework.exceptions.FatalConfigurationException;
62  import org.archive.crawler.settings.ModuleType;
63  import org.archive.crawler.settings.RegularExpressionConstraint;
64  import org.archive.crawler.settings.SimpleType;
65  import org.archive.crawler.settings.Type;
66  import org.archive.crawler.url.Canonicalizer;
67  import org.archive.net.UURI;
68  import org.archive.util.ArchiveUtils;
69  
70  /***
71   * Shared facilities for Frontier implementations.
72   * 
73   * @author gojomo
74   */
75  public abstract class AbstractFrontier extends ModuleType
76  implements CrawlStatusListener, Frontier, FetchStatusCodes,
77          CoreAttributeConstants, Serializable {
78      private static final long serialVersionUID = -4766504935003203930L;
79  
80      private static final Logger logger = Logger
81              .getLogger(AbstractFrontier.class.getName());
82  
83      protected transient CrawlController controller;
84  
85      /*** ordinal numbers to assign to created CrawlURIs */
86      protected AtomicLong nextOrdinal = new AtomicLong(1); 
87  
88      /*** should the frontier hold any threads asking for URIs? */
89      protected boolean shouldPause = false;
90  
91      /***
92       * should the frontier send an EndedException to any threads asking for
93       * URIs?
94       */
95      protected transient boolean shouldTerminate = false;
96  
97      /***
98       * how many multiples of last fetch elapsed time to wait before recontacting
99       * same server
100      */
101     public final static String ATTR_DELAY_FACTOR = "delay-factor";
102 
103     protected final static Float DEFAULT_DELAY_FACTOR = new Float(5);
104 
105     /***
106      * always wait this long after one completion before recontacting same
107      * server, regardless of multiple
108      */
109     public final static String ATTR_MIN_DELAY = "min-delay-ms";
110 
111     // 3 secs.
112     protected final static Integer DEFAULT_MIN_DELAY = new Integer(3000);
113 
114     /***
115      * Whether to respect a 'Crawl-Delay' (in seconds) given in a site's
116      * robots.txt
117      */
118     public final static String 
119         ATTR_RESPECT_CRAWL_DELAY_UP_TO_SECS = "respect-crawl-delay-up-to-secs";
120 
121     // by default, respect robots.txt-provided Crawl-Delay up to 300 secs
122     protected final static Integer 
123         DEFAULT_RESPECT_CRAWL_DELAY_UP_TO_SECS = 300; // 5 minutes
124     
125     /*** never wait more than this long, regardless of multiple */
126     public final static String ATTR_MAX_DELAY = "max-delay-ms";
127 
128     // 30 secs
129     protected final static Integer DEFAULT_MAX_DELAY = new Integer(30000);
130 
131     /*** number of hops of embeds (ERX) to bump to front of host queue */
132     public final static String ATTR_PREFERENCE_EMBED_HOPS =
133         "preference-embed-hops";
134 
135     protected final static Integer DEFAULT_PREFERENCE_EMBED_HOPS =
136         new Integer(1);
137 
138     /*** maximum per-host bandwidth usage */
139     public final static String ATTR_MAX_HOST_BANDWIDTH_USAGE =
140         "max-per-host-bandwidth-usage-KB-sec";
141 
142     protected final static Integer DEFAULT_MAX_HOST_BANDWIDTH_USAGE =
143         new Integer(0);
144 
145     /*** maximum overall bandwidth usage */
146     public final static String ATTR_MAX_OVERALL_BANDWIDTH_USAGE =
147         "total-bandwidth-usage-KB-sec";
148 
149     protected final static Integer DEFAULT_MAX_OVERALL_BANDWIDTH_USAGE =
150         new Integer(0);
151 
152     /*** for retryable problems, seconds to wait before a retry */
153     public final static String ATTR_RETRY_DELAY = "retry-delay-seconds";
154 
155     // 15 mins
156     protected final static Long DEFAULT_RETRY_DELAY = new Long(900);
157 
158     /*** maximum times to emit a CrawlURI without final disposition */
159     public final static String ATTR_MAX_RETRIES = "max-retries";
160 
161     protected final static Integer DEFAULT_MAX_RETRIES = new Integer(30);
162 
163     public final static String ATTR_QUEUE_ASSIGNMENT_POLICY =
164         "queue-assignment-policy";
165 
166     /*** queue assignment to force onto CrawlURIs; intended to be overridden */
167     public final static String ATTR_FORCE_QUEUE = "force-queue-assignment";
168 
169     protected final static String DEFAULT_FORCE_QUEUE = "";
170 
171     // word chars, dash, period, comma, colon
172     protected final static String ACCEPTABLE_FORCE_QUEUE = "[-//w//.,:]*";
173         
174     /*** whether pause, rather than finish, when crawl appears done */
175     public final static String ATTR_PAUSE_AT_FINISH = "pause-at-finish";
176     // TODO: change default to true once well-tested
177     protected final static Boolean DEFAULT_PAUSE_AT_FINISH = Boolean.FALSE;
178     
179     /*** whether to pause at crawl start */
180     public final static String ATTR_PAUSE_AT_START = "pause-at-start";
181     protected final static Boolean DEFAULT_PAUSE_AT_START = Boolean.FALSE;
182     
183     /*** whether to pause at crawl start */
184     public final static String ATTR_SOURCE_TAG_SEEDS = "source-tag-seeds";
185     protected final static Boolean DEFAULT_SOURCE_TAG_SEEDS = Boolean.FALSE;
186 
187     /***
188      * Recover log on or off attribute.
189      */
190     protected final static String ATTR_RECOVERY_ENABLED =
191         "recovery-log-enabled";
192     protected final static Boolean DEFAULT_ATTR_RECOVERY_ENABLED =
193         Boolean.TRUE;
194 
195     // to maintain serialization compatibility, stored under old names
196     protected long queuedUriCount;
197     protected long succeededFetchCount;
198     protected long failedFetchCount;
199     protected long disregardedUriCount;
200     
201     // top-level stats
202     /*** total URIs queued to be visited */
203     transient protected AtomicLong liveQueuedUriCount = new AtomicLong(0); 
204 
205     transient protected AtomicLong liveSucceededFetchCount = new AtomicLong(0);
206 
207     transient protected AtomicLong liveFailedFetchCount = new AtomicLong(0);
208 
209     /*** URIs that are disregarded (for example because of robot.txt rules */
210     transient protected AtomicLong liveDisregardedUriCount = new AtomicLong(0);
211 
212     /***
213      * Used when bandwidth constraint are used.
214      */
215     protected long totalProcessedBytes = 0;
216 
217     private transient long nextURIEmitTime = 0;
218 
219     protected long processedBytesAfterLastEmittedURI = 0;
220     
221     protected int lastMaxBandwidthKB = 0;
222 
223     /***
224      * Crawl replay logger.
225      * 
226      * Currently captures Frontier/URI transitions.
227      * Can be null if user chose not to run a recovery.log.
228      */
229     private transient FrontierJournal recover = null;
230 
231     /*** file collecting report of ignored seed-file entries (if any) */
232     public static final String IGNORED_SEEDS_FILENAME = "seeds.ignored";
233 
234     /***
235      * @param name Name of this frontier.
236      * @param description Description for this frontier.
237      */
238     public AbstractFrontier(String name, String description) {
239         super(name, description);
240         addElementToDefinition(new SimpleType(ATTR_DELAY_FACTOR,
241                 "How many multiples of last fetch elapsed time to wait before "
242                         + "recontacting same server", DEFAULT_DELAY_FACTOR));
243         addElementToDefinition(new SimpleType(ATTR_MAX_DELAY,
244                 "Never wait more than this long.", DEFAULT_MAX_DELAY));
245         addElementToDefinition(new SimpleType(ATTR_MIN_DELAY,
246                 "Always wait this long after one completion before recontacting "
247                         + "same server.", DEFAULT_MIN_DELAY));
248         addElementToDefinition(new SimpleType(ATTR_RESPECT_CRAWL_DELAY_UP_TO_SECS,
249                 "Respect a Crawl-Delay directive in a site's robots.txt "
250                 +"up to this value in seconds. (If longer, simply "
251                 +"respect this value.) Default is 300 seconds (5 minutes).", 
252                 DEFAULT_RESPECT_CRAWL_DELAY_UP_TO_SECS));
253         addElementToDefinition(new SimpleType(ATTR_MAX_RETRIES,
254                 "How often to retry fetching a URI that failed to be retrieved. "
255                         + "If zero, the crawler will get the robots.txt only.",
256                 DEFAULT_MAX_RETRIES));
257         addElementToDefinition(new SimpleType(ATTR_RETRY_DELAY,
258                 "How long to wait by default until we retry fetching a"
259                         + " URI that failed to be retrieved (seconds). ",
260                 DEFAULT_RETRY_DELAY));
261         addElementToDefinition(new SimpleType(
262                 ATTR_PREFERENCE_EMBED_HOPS,
263                 "Number of embedded (or redirected) hops up to which "
264                 + "a URI has higher priority scheduling. For example, if set "
265                 + "to 1 (the default), items such as inline images (1-hop "
266                 + "embedded resources) will be scheduled ahead of all regular "
267                 + "links (or many-hop resources, like nested frames). If set to "
268                 + "zero, no preferencing will occur, and embeds/redirects are "
269                 + "scheduled the same as regular links.",
270                 DEFAULT_PREFERENCE_EMBED_HOPS));
271         Type t;
272         t = addElementToDefinition(new SimpleType(
273                 ATTR_MAX_OVERALL_BANDWIDTH_USAGE,
274                 "The maximum average bandwidth the crawler is allowed to use. "
275                 + "The actual read speed is not affected by this setting, it only "
276                 + "holds back new URIs from being processed when the bandwidth "
277                 + "usage has been to high. 0 means no bandwidth limitation.",
278                 DEFAULT_MAX_OVERALL_BANDWIDTH_USAGE));
279         t.setOverrideable(false);
280         t = addElementToDefinition(new SimpleType(
281                 ATTR_MAX_HOST_BANDWIDTH_USAGE,
282                 "The maximum average bandwidth the crawler is allowed to use per "
283                 + "host. The actual read speed is not affected by this setting, "
284                 + "it only holds back new URIs from being processed when the "
285                 + "bandwidth usage has been to high. 0 means no bandwidth "
286                 + "limitation.", DEFAULT_MAX_HOST_BANDWIDTH_USAGE));
287         t.setExpertSetting(true);
288 
289         // Read the list of permissible choices from heritrix.properties.
290         // Its a list of space- or comma-separated values.
291         String queueStr = System.getProperty(AbstractFrontier.class.getName() +
292                 "." + ATTR_QUEUE_ASSIGNMENT_POLICY,
293                 HostnameQueueAssignmentPolicy.class.getName() + " " +
294                 IPQueueAssignmentPolicy.class.getName() + " " +
295                 BucketQueueAssignmentPolicy.class.getName() + " " +
296                 SurtAuthorityQueueAssignmentPolicy.class.getName() + " " +
297                 TopmostAssignedSurtQueueAssignmentPolicy.class.getName());
298         Pattern p = Pattern.compile("//s*,//s*|//s+");
299         String [] queues = p.split(queueStr);
300         if (queues.length <= 0) {
301             throw new RuntimeException("Failed parse of " +
302                     " assignment queue policy string: " + queueStr);
303         }
304         t = addElementToDefinition(new SimpleType(ATTR_QUEUE_ASSIGNMENT_POLICY,
305                 "Defines how to assign URIs to queues. Can assign by host, " +
306                 "by ip, and into one of a fixed set of buckets (1k).",
307                 queues[0], queues));
308         t.setExpertSetting(true);
309         t.setOverrideable(true);
310 
311         t = addElementToDefinition(new SimpleType(
312                 ATTR_FORCE_QUEUE,
313                 "The queue name into which to force URIs. Should "
314                 + "be left blank at global level.  Specify a "
315                 + "per-domain/per-host override to force URIs into "
316                 + "a particular named queue, regardless of the assignment "
317                 + "policy in effect (domain or ip-based politeness). "
318                 + "This could be used on domains known to all be from "
319                 + "the same small set of IPs (eg blogspot, dailykos, etc.) "
320                 + "to simulate IP-based politeness, or it could be used if "
321                 + "you wanted to enforce politeness over a whole domain, even "
322                 + "though the subdomains are split across many IPs.",
323                 DEFAULT_FORCE_QUEUE));
324         t.setOverrideable(true);
325         t.setExpertSetting(true);
326         t.addConstraint(new RegularExpressionConstraint(ACCEPTABLE_FORCE_QUEUE,
327                 Level.WARNING, "This field must contain only alphanumeric "
328                 + "characters plus period, dash, comma, colon, or underscore."));
329         t = addElementToDefinition(new SimpleType(
330                 ATTR_PAUSE_AT_START,
331                 "Whether to pause when the crawl begins, before any URIs " +
332                 "are tried. This gives the operator a chance to verify or " +
333                 "adjust the crawl before actual work begins. " +
334                 "Default is false.", DEFAULT_PAUSE_AT_START));
335         t = addElementToDefinition(new SimpleType(
336                 ATTR_PAUSE_AT_FINISH,
337                 "Whether to pause when the crawl appears finished, rather "
338                 + "than immediately end the crawl. This gives the operator an "
339                 + "opportunity to view crawl results, and possibly add URIs or "
340                 + "adjust settings, while the crawl state is still available. "
341                 + "Default is false.", DEFAULT_PAUSE_AT_FINISH));
342         t.setOverrideable(false);
343         
344         t = addElementToDefinition(new SimpleType(
345                 ATTR_SOURCE_TAG_SEEDS,
346                 "Whether to tag seeds with their own URI as a heritable " +
347                 "'source' String, which will be carried-forward to all URIs " +
348                 "discovered on paths originating from that seed. When " +
349                 "present, such source tags appear in the second-to-last " +
350                 "crawl.log field.", DEFAULT_SOURCE_TAG_SEEDS));
351         t.setOverrideable(false);
352         
353         t = addElementToDefinition(new SimpleType(ATTR_RECOVERY_ENABLED,
354                 "Set to false to disable recovery log writing.  Do this if " +
355                 "you you are using the checkpoint feature for recovering " +
356                 "crashed crawls.", DEFAULT_ATTR_RECOVERY_ENABLED));
357         t.setExpertSetting(true);
358         // No sense in it being overrideable.
359         t.setOverrideable(false);
360     }
361 
362     public void start() {
363         if (((Boolean)getUncheckedAttribute(null, ATTR_PAUSE_AT_START))
364                 .booleanValue()) {
365             // trigger crawl-wide pause
366             controller.requestCrawlPause();
367         } else {
368             // simply begin
369             unpause(); 
370         }
371     }
372     
373     synchronized public void pause() {
374         shouldPause = true;
375     }
376 
377     synchronized public void unpause() {
378         shouldPause = false;
379         notifyAll();
380     }
381 
382     public void initialize(CrawlController c)
383             throws FatalConfigurationException, IOException {
384         c.addCrawlStatusListener(this);
385         File logsDisk = null;
386         try {
387             logsDisk = c.getSettingsDir(CrawlOrder.ATTR_LOGS_PATH);
388         } catch (AttributeNotFoundException e) {
389             logger.log(Level.SEVERE, "Failed to get logs directory", e);
390         }
391         if (logsDisk != null) {
392             String logsPath = logsDisk.getAbsolutePath() + File.separatorChar;
393             if (((Boolean)getUncheckedAttribute(null, ATTR_RECOVERY_ENABLED))
394                     .booleanValue()) {
395                 this.recover = new RecoveryJournal(logsPath,
396                     FrontierJournal.LOGNAME_RECOVER);
397             }
398         }
399 //        try {
400 //            final Class qapClass = Class.forName((String)getUncheckedAttribute(
401 //                    null, ATTR_QUEUE_ASSIGNMENT_POLICY));
402 //
403 //            queueAssignmentPolicy =
404 //                (QueueAssignmentPolicy)qapClass.newInstance();
405 //        } catch (Exception e) {
406 //            logger.log(Level.SEVERE, "Bad queue assignment policy class", e);
407 //            throw new FatalConfigurationException(e.getMessage());
408 //        }
409     }
410 
411     synchronized public void terminate() {
412         shouldTerminate = true;
413         if (this.recover != null) {
414             this.recover.close();
415             this.recover = null;
416         }
417         unpause();
418     }
419 
420     /***
421      * Report CrawlURI to each of the three 'substats' accumulators
422      * (group/queue, server, host) for a given stage. 
423      * 
424      * @param curi
425      * @param stage
426      */
427     protected void tally(CrawlURI curi, Stage stage) {
428         // Tally per-server, per-host, per-frontier-class running totals
429         CrawlServer server =
430             controller.getServerCache().getServerFor(curi);
431         if (server != null) {
432             server.getSubstats().tally(curi, stage);
433         }
434         CrawlHost host = 
435             controller.getServerCache().getHostFor(curi);
436         if (host != null) {
437             host.getSubstats().tally(curi, stage);
438         } 
439         FrontierGroup group = 
440             controller.getFrontier().getGroup(curi);
441         group.getSubstats().tally(curi, stage);
442     }
443     
444     protected void doJournalFinishedSuccess(CrawlURI c) {
445         tally(c,CrawlSubstats.Stage.SUCCEEDED);
446         if (this.recover != null) {
447             this.recover.finishedSuccess(c);
448         }
449     }
450 
451     protected void doJournalAdded(CrawlURI c) {
452         tally(c,CrawlSubstats.Stage.SCHEDULED);
453         if (this.recover != null) {
454             this.recover.added(c);
455         }
456     }
457 
458     protected void doJournalRescheduled(CrawlURI c) {
459         tally(c,CrawlSubstats.Stage.RETRIED);
460         if (this.recover != null) {
461             this.recover.rescheduled(c);
462         }
463     }
464 
465     protected void doJournalFinishedFailure(CrawlURI c) {
466         tally(c,CrawlSubstats.Stage.FAILED);
467         if (this.recover != null) {
468             this.recover.finishedFailure(c);
469         }
470     }
471     
472     protected void doJournalDisregarded(CrawlURI c) {
473         tally(c,CrawlSubstats.Stage.DISREGARDED);
474         if (this.recover != null) {
475             this.recover.finishedDisregard(c);
476         }
477     }
478 
479     protected void doJournalEmitted(CrawlURI c) {
480         if (this.recover != null) {
481             this.recover.emitted(c);
482         }
483     }
484 
485     /***
486      * Frontier is empty only if all queues are empty and no URIs are in-process
487      * 
488      * @return True if queues are empty.
489      */
490     public boolean isEmpty() {
491         return liveQueuedUriCount.get() == 0;
492     }
493 
494     /***
495      * Increment the running count of queued URIs. 
496      */
497     protected void incrementQueuedUriCount() {
498         liveQueuedUriCount.incrementAndGet();
499     }
500 
501     /***
502      * Increment the running count of queued URIs. Synchronized because
503      * operations on longs are not atomic.
504      * 
505      * @param increment
506      *            amount to increment the queued count
507      */
508     protected void incrementQueuedUriCount(long increment) {
509         liveQueuedUriCount.addAndGet(increment);
510     }
511 
512     /***
513      * Note that a number of queued Uris have been deleted.
514      * 
515      * @param numberOfDeletes
516      */
517     protected void decrementQueuedCount(long numberOfDeletes) {
518         liveQueuedUriCount.addAndGet(-numberOfDeletes);
519     }
520 
521     /***
522      * (non-Javadoc)
523      * 
524      * @see org.archive.crawler.framework.Frontier#queuedUriCount()
525      */
526     public long queuedUriCount() {
527         return liveQueuedUriCount.get();
528     }
529 
530     /***
531      * (non-Javadoc)
532      * 
533      * @see org.archive.crawler.framework.Frontier#finishedUriCount()
534      */
535     public long finishedUriCount() {
536         return liveSucceededFetchCount.get() + liveFailedFetchCount.get() + liveDisregardedUriCount.get();
537     }
538 
539     /***
540      * Increment the running count of successfully fetched URIs. 
541      */
542     protected void incrementSucceededFetchCount() {
543         liveSucceededFetchCount.incrementAndGet();
544     }
545 
546     /***
547      * (non-Javadoc)
548      * 
549      * @see org.archive.crawler.framework.Frontier#succeededFetchCount()
550      */
551     public long succeededFetchCount() {
552         return liveSucceededFetchCount.get();
553     }
554 
555     /***
556      * Increment the running count of failed URIs. 
557      */
558     protected void incrementFailedFetchCount() {
559         liveFailedFetchCount.incrementAndGet();
560     }
561 
562     /***
563      * (non-Javadoc)
564      * 
565      * @see org.archive.crawler.framework.Frontier#failedFetchCount()
566      */
567     public long failedFetchCount() {
568         return liveFailedFetchCount.get();
569     }
570 
571     /***
572      * Increment the running count of disregarded URIs. Synchronized because
573      * operations on longs are not atomic.
574      */
575     protected void incrementDisregardedUriCount() {
576         liveDisregardedUriCount.incrementAndGet();
577     }
578 
579     public long disregardedUriCount() {
580         return liveDisregardedUriCount.get();
581     }
582 
583     /*** @deprecated misnomer; use StatisticsTracking figures instead */
584     public long totalBytesWritten() {
585         return totalProcessedBytes;
586     }
587 
588     /***
589      * Load up the seeds.
590      * 
591      * This method is called on initialize and inside in the crawlcontroller
592      * when it wants to force reloading of configuration.
593      * 
594      * @see org.archive.crawler.framework.CrawlController#kickUpdate()
595      */
596     public void loadSeeds() {
597         Writer ignoredWriter = new StringWriter();
598         logger.info("beginning");
599         // Get the seeds to refresh.
600         Iterator iter = this.controller.getScope().seedsIterator(ignoredWriter);
601         int count = 0; 
602         while (iter.hasNext()) {
603             UURI u = (UURI)iter.next();
604             CandidateURI caUri = CandidateURI.createSeedCandidateURI(u);
605             caUri.setSchedulingDirective(CandidateURI.MEDIUM);
606             if (((Boolean)getUncheckedAttribute(null, ATTR_SOURCE_TAG_SEEDS))
607                     .booleanValue()) {
608                 caUri.putString(CoreAttributeConstants.A_SOURCE_TAG,caUri.toString());
609                 caUri.makeHeritable(CoreAttributeConstants.A_SOURCE_TAG);
610             }
611             schedule(caUri);
612             count++;
613             if(count%1000==0) {
614                 logger.info(count+" seeds");
615             }
616         }
617         // save ignored items (if any) where they can be consulted later
618         saveIgnoredItems(ignoredWriter.toString(), controller.getDisk());
619         logger.info("finished");
620     }
621 
622     /***
623      * Dump ignored seed items (if any) to disk; delete file otherwise.
624      * Static to allow non-derived sibling classes (frontiers not yet 
625      * subclassed here) to reuse.
626      * 
627      * @param ignoredItems
628      * @param dir 
629      */
630     public static void saveIgnoredItems(String ignoredItems, File dir) {
631         File ignoredFile = new File(dir, IGNORED_SEEDS_FILENAME);
632         if(ignoredItems==null | ignoredItems.length()>0) {
633             try {
634                 BufferedWriter bw = new BufferedWriter(
635                     new OutputStreamWriter(new FileOutputStream(ignoredFile),"UTF-8"));
636                 bw.write(ignoredItems);
637                 bw.close();
638             } catch (IOException e) {
639                 // TODO make an alert?
640                 e.printStackTrace();
641             }
642         } else {
643             // delete any older file (if any)
644             ignoredFile.delete();
645         }
646     }
647 
648     protected CrawlURI asCrawlUri(CandidateURI caUri) {
649         CrawlURI curi;
650         if (caUri instanceof CrawlURI) {
651             curi = (CrawlURI)caUri;
652         } else {
653             curi = CrawlURI.from(caUri, nextOrdinal.getAndIncrement());
654         }
655         curi.setClassKey(getClassKey(curi));
656         return curi;
657     }
658 
659     /***
660      * @param now
661      * @throws InterruptedException
662      * @throws EndedException
663      */
664     protected synchronized void preNext(long now) throws InterruptedException,
665             EndedException {
666         if (this.controller == null) {
667             return;
668         }
669         
670         // Check completion conditions
671         if (this.controller.atFinish()) {
672             if (((Boolean)getUncheckedAttribute(null, ATTR_PAUSE_AT_FINISH))
673                     .booleanValue()) {
674                 this.controller.requestCrawlPause();
675             } else {
676                 this.controller.beginCrawlStop();
677             }
678         }
679 
680         // enforce operator pause
681         if (shouldPause) {
682             while (shouldPause) {
683                 this.controller.toePaused();
684                 wait();
685             }
686             // exitted pause; possibly finish regardless of pause-at-finish
687             if (controller != null && controller.atFinish()) {
688                 this.controller.beginCrawlStop();
689             }
690         }
691 
692         // enforce operator terminate or thread retirement
693         if (shouldTerminate
694                 || ((ToeThread)Thread.currentThread()).shouldRetire()) {
695             throw new EndedException("terminated");
696         }
697 
698         enforceBandwidthThrottle(now);
699     }
700 
701     /***
702      * Perform any special handling of the CrawlURI, such as promoting its URI
703      * to seed-status, or preferencing it because it is an embed.
704      * 
705      * @param curi
706      */
707     protected void applySpecialHandling(CrawlURI curi) {
708         if (curi.isSeed() && curi.getVia() != null
709                 && curi.flattenVia().length() > 0) {
710             // The only way a seed can have a non-empty via is if it is the
711             // result of a seed redirect. Add it to the seeds list.
712             //
713             // This is a feature. This is handling for case where a seed
714             // gets immediately redirected to another page. What we're doing is
715             // treating the immediate redirect target as a seed.
716             this.controller.getScope().addSeed(curi);
717             // And it needs rapid scheduling.
718             if (curi.getSchedulingDirective() == CandidateURI.NORMAL) {
719                 curi.setSchedulingDirective(CandidateURI.MEDIUM);
720             }
721         }
722 
723         // optionally preferencing embeds up to MEDIUM
724         int prefHops = ((Integer)getUncheckedAttribute(curi,
725                 ATTR_PREFERENCE_EMBED_HOPS)).intValue();
726         if (prefHops > 0) {
727             int embedHops = curi.getTransHops();
728             if (embedHops > 0 && embedHops <= prefHops
729                     && curi.getSchedulingDirective() == CandidateURI.NORMAL) {
730                 // number of embed hops falls within the preferenced range, and
731                 // uri is not already MEDIUM -- so promote it
732                 curi.setSchedulingDirective(CandidateURI.MEDIUM);
733             }
734         }
735     }
736 
737     /***
738      * Perform fixups on a CrawlURI about to be returned via next().
739      * 
740      * @param curi
741      *            CrawlURI about to be returned by next()
742      * @param q
743      *            the queue from which the CrawlURI came
744      */
745     protected void noteAboutToEmit(CrawlURI curi, WorkQueue q) {
746         curi.setHolder(q);
747         // if (curi.getServer() == null) {
748         //    // TODO: perhaps short-circuit the emit here,
749         //    // because URI will be rejected as unfetchable
750         // }
751         doJournalEmitted(curi);
752     }
753 
754     /***
755      * @param curi
756      * @return the CrawlServer to be associated with this CrawlURI
757      */
758     protected CrawlServer getServer(CrawlURI curi) {
759         return this.controller.getServerCache().getServerFor(curi);
760     }
761 
762     /***
763      * Return a suitable value to wait before retrying the given URI.
764      * 
765      * @param curi
766      *            CrawlURI to be retried
767      * @return millisecond delay before retry
768      */
769     protected long retryDelayFor(CrawlURI curi) {
770         int status = curi.getFetchStatus();
771         return (status == S_CONNECT_FAILED || status == S_CONNECT_LOST ||
772                 status == S_DOMAIN_UNRESOLVABLE)?
773             ((Long)getUncheckedAttribute(curi, ATTR_RETRY_DELAY)).longValue():
774             0; // no delay for most
775     }
776 
777     /***
778      * Update any scheduling structures with the new information in this
779      * CrawlURI. Chiefly means make necessary arrangements for no other URIs at
780      * the same host to be visited within the appropriate politeness window.
781      * 
782      * @param curi
783      *            The CrawlURI
784      * @return millisecond politeness delay
785      */
786     protected long politenessDelayFor(CrawlURI curi) {
787         long durationToWait = 0;
788         if (curi.containsKey(A_FETCH_BEGAN_TIME)
789                 && curi.containsKey(A_FETCH_COMPLETED_TIME)) {
790 
791             long completeTime = curi.getLong(A_FETCH_COMPLETED_TIME);
792             long durationTaken = (completeTime - curi
793                     .getLong(A_FETCH_BEGAN_TIME));
794             durationToWait = (long)(((Float)getUncheckedAttribute(curi,
795                     ATTR_DELAY_FACTOR)).floatValue() * durationTaken);
796 
797             long minDelay = ((Integer)getUncheckedAttribute(curi,
798                     ATTR_MIN_DELAY)).longValue();
799             
800             if (minDelay > durationToWait) {
801                 // wait at least the minimum
802                 durationToWait = minDelay;
803             }
804 
805             long maxDelay = ((Integer)getUncheckedAttribute(curi,
806                     ATTR_MAX_DELAY)).longValue();
807             if (durationToWait > maxDelay) {
808                 // wait no more than the maximum
809                 durationToWait = maxDelay;
810             }
811 
812             long respectThreshold = ((Integer)getUncheckedAttribute(curi,
813                     ATTR_RESPECT_CRAWL_DELAY_UP_TO_SECS)).longValue()*1000;
814             
815             if(durationToWait<respectThreshold) {
816                 // may need to extend wait
817                 CrawlServer s = controller.getServerCache().getServerFor(curi);
818                 String ua = curi.getUserAgent();
819                 if(ua==null) {
820                     ua = controller.getOrder().getUserAgent(curi);
821                 }
822                 RobotsExclusionPolicy rep = s.getRobots(); 
823                 if (rep!=null) {
824                     long crawlDelay = (long)(1000 * s.getRobots().getCrawlDelay(ua));
825                     crawlDelay = 
826                         (crawlDelay > respectThreshold) 
827                             ? respectThreshold 
828                             : crawlDelay; 
829                     if (crawlDelay > durationToWait) {
830                         // wait at least the directive crawl-delay
831                         durationToWait = crawlDelay;
832                     }
833                 }
834             }
835             
836             long now = System.currentTimeMillis();
837             int maxBandwidthKB = ((Integer)getUncheckedAttribute(curi,
838                     ATTR_MAX_HOST_BANDWIDTH_USAGE)).intValue();
839             if (maxBandwidthKB > 0) {
840                 // Enforce bandwidth limit
841                 CrawlHost host = controller.getServerCache().getHostFor(curi);
842                 long minDurationToWait = host.getEarliestNextURIEmitTime()
843                         - now;
844                 float maxBandwidth = maxBandwidthKB * 1.024F; // kilo factor
845                 long processedBytes = curi.getContentSize();
846                 host
847                         .setEarliestNextURIEmitTime((long)(processedBytes / maxBandwidth)
848                                 + now);
849 
850                 if (minDurationToWait > durationToWait) {
851                     durationToWait = minDurationToWait;
852                 }
853             }
854         }
855         return durationToWait;
856     }
857 
858     /***
859      * Ensure that any overall-bandwidth-usage limit is respected, by pausing as
860      * long as necessary.
861      * 
862      * @param now
863      * @throws InterruptedException
864      */
865     private void enforceBandwidthThrottle(long now) throws InterruptedException {
866         int maxBandwidthKB = ((Integer)getUncheckedAttribute(null,
867                 ATTR_MAX_OVERALL_BANDWIDTH_USAGE)).intValue();
868         if (maxBandwidthKB > 0) {
869             // Make sure that new bandwidth setting doesn't affect total crawl
870             if (maxBandwidthKB != lastMaxBandwidthKB) {
871                 lastMaxBandwidthKB = maxBandwidthKB;
872                 processedBytesAfterLastEmittedURI = totalProcessedBytes;
873             }
874 
875             // Enforce bandwidth limit
876             long sleepTime = nextURIEmitTime - now;
877             float maxBandwidth = maxBandwidthKB * 1.024F; // Kilo_factor
878             long processedBytes = totalProcessedBytes
879                     - processedBytesAfterLastEmittedURI;
880             long shouldHaveEmittedDiff = nextURIEmitTime == 0? 0
881                     : nextURIEmitTime - now;
882             nextURIEmitTime = (long)(processedBytes / maxBandwidth) + now
883                     + shouldHaveEmittedDiff;
884             processedBytesAfterLastEmittedURI = totalProcessedBytes;
885             if (sleepTime > 0) {
886                 long targetTime = now + sleepTime;
887                 now = System.currentTimeMillis();
888                 while (now < targetTime) {
889                     synchronized (this) {
890                         if (logger.isLoggable(Level.FINE)) {
891                             logger.fine("Frontier waits for: " + sleepTime
892                                     + "ms to respect bandwidth limit.");
893                         }
894                         // TODO: now that this is a wait(), frontier can
895                         // still schedule and finish items while waiting,
896                         // which is good, but multiple threads could all
897                         // wait for the same wakeTime, which somewhat
898                         // spoils the throttle... should be fixed.
899                         wait(targetTime - now);
900                     }
901                     now = System.currentTimeMillis();
902                 }
903             }
904         }
905     }
906 
907     /***
908      * Take note of any processor-local errors that have been entered into the
909      * CrawlURI.
910      * 
911      * @param curi
912      *  
913      */
914     protected void logLocalizedErrors(CrawlURI curi) {
915         if (curi.containsKey(A_LOCALIZED_ERRORS)) {
916             List localErrors = (List)curi.getObject(A_LOCALIZED_ERRORS);
917             Iterator iter = localErrors.iterator();
918             while (iter.hasNext()) {
919                 Object array[] = {curi, iter.next()};
920                 controller.localErrors.log(Level.WARNING, curi.getUURI()
921                         .toString(), array);
922             }
923             // once logged, discard
924             curi.remove(A_LOCALIZED_ERRORS);
925         }
926     }
927 
928     /***
929      * Utility method to return a scratch dir for the given key's temp files.
930      * Every key gets its own subdir. To avoid having any one directory with
931      * thousands of files, there are also two levels of enclosing directory
932      * named by the least-significant hex digits of the key string's java
933      * hashcode.
934      * 
935      * @param key
936      * @return File representing scratch directory
937      */
938     protected File scratchDirFor(String key) {
939         String hex = Integer.toHexString(key.hashCode());
940         while (hex.length() < 4) {
941             hex = "0" + hex;
942         }
943         int len = hex.length();
944         return new File(this.controller.getStateDisk(), hex.substring(len - 2,
945                 len)
946                 + File.separator
947                 + hex.substring(len - 4, len - 2)
948                 + File.separator + key);
949     }
950 
951     protected boolean overMaxRetries(CrawlURI curi) {
952         // never retry more than the max number of times
953         if (curi.getFetchAttempts() >= ((Integer)getUncheckedAttribute(curi,
954                 ATTR_MAX_RETRIES)).intValue()) {
955             return true;
956         }
957         return false;
958     }
959 
960     public void importRecoverLog(String pathToLog, boolean retainFailures)
961             throws IOException {
962         File source = new File(pathToLog);
963         if (!source.isAbsolute()) {
964             source = new File(getSettingsHandler().getOrder().getController()
965                     .getDisk(), pathToLog);
966         }
967         RecoveryJournal.importRecoverLog(source, controller, retainFailures);
968     }
969 
970     /*
971      * (non-Javadoc)
972      * 
973      * @see org.archive.crawler.framework.URIFrontier#kickUpdate()
974      */
975     public void kickUpdate() {
976         // by default, do nothing
977         // (scope will loadSeeds, if appropriate)
978     }
979 
980     /***
981      * Log to the main crawl.log
982      * 
983      * @param curi
984      */
985     protected void log(CrawlURI curi) {
986         curi.aboutToLog();
987         Object array[] = {curi};
988         this.controller.uriProcessing.log(Level.INFO,
989                 curi.getUURI().toString(), array);
990     }
991 
992     protected boolean isDisregarded(CrawlURI curi) {
993         switch (curi.getFetchStatus()) {
994         case S_ROBOTS_PRECLUDED: // they don't want us to have it
995         case S_BLOCKED_BY_CUSTOM_PROCESSOR:
996         case S_OUT_OF_SCOPE: // filtered out by scope
997         case S_BLOCKED_BY_USER: // filtered out by user
998         case S_TOO_MANY_EMBED_HOPS: // too far from last true link
999         case S_TOO_MANY_LINK_HOPS: // too far from seeds
1000         case S_DELETED_BY_USER: // user deleted
1001             return true;
1002         default:
1003             return false;
1004         }
1005     }
1006 
1007     /***
1008      * Checks if a recently completed CrawlURI that did not finish successfully
1009      * needs to be retried (processed again after some time elapses)
1010      * 
1011      * @param curi
1012      *            The CrawlURI to check
1013      * @return True if we need to retry.
1014      */
1015     protected boolean needsRetrying(CrawlURI curi) {
1016         if (overMaxRetries(curi)) {
1017             return false;
1018         }
1019 
1020         switch (curi.getFetchStatus()) {
1021         case HttpStatus.SC_UNAUTHORIZED:
1022             // We can get here though usually a positive status code is
1023             // a success. We get here if there is rfc2617 credential data
1024             // loaded and we're supposed to go around again. See if any
1025             // rfc2617 credential present and if there, assume it got
1026             // loaded in FetchHTTP on expectation that we're to go around
1027             // again. If no rfc2617 loaded, we should not be here.
1028             boolean loaded = curi.hasRfc2617CredentialAvatar();
1029             if (!loaded && logger.isLoggable(Level.INFO)) {
1030                 logger.info("Have 401 but no creds loaded " + curi);
1031             }
1032             return loaded;
1033         case S_DEFERRED:
1034         case S_CONNECT_FAILED:
1035         case S_CONNECT_LOST:
1036         case S_DOMAIN_UNRESOLVABLE:
1037             // these are all worth a retry
1038             // TODO: consider if any others (S_TIMEOUT in some cases?) deserve
1039             // retry
1040             return true;
1041         default:
1042             return false;
1043         }
1044     }
1045 
1046     /***
1047      * Canonicalize passed uuri. Its would be sweeter if this canonicalize
1048      * function was encapsulated by that which it canonicalizes but because
1049      * settings change with context -- i.e. there may be overrides in operation
1050      * for a particular URI -- its not so easy; Each CandidateURI would need a
1051      * reference to the settings system. That's awkward to pass in.
1052      * 
1053      * @param uuri Candidate URI to canonicalize.
1054      * @return Canonicalized version of passed <code>uuri</code>.
1055      */
1056     protected String canonicalize(UURI uuri) {
1057         return Canonicalizer.canonicalize(uuri, this.controller.getOrder());
1058     }
1059 
1060     /***
1061      * Canonicalize passed CandidateURI. This method differs from
1062      * {@link #canonicalize(UURI)} in that it takes a look at
1063      * the CandidateURI context possibly overriding any canonicalization effect if
1064      * it could make us miss content. If canonicalization produces an URL that
1065      * was 'alreadyseen', but the entry in the 'alreadyseen' database did
1066      * nothing but redirect to the current URL, we won't get the current URL;
1067      * we'll think we've already see it. Examples would be archive.org
1068      * redirecting to www.archive.org or the inverse, www.netarkivet.net
1069      * redirecting to netarkivet.net (assuming stripWWW rule enabled).
1070      * <p>Note, this method under circumstance sets the forceFetch flag.
1071      * 
1072      * @param cauri CandidateURI to examine.
1073      * @return Canonicalized <code>cacuri</code>.
1074      */
1075     protected String canonicalize(CandidateURI cauri) {
1076         String canon = canonicalize(cauri.getUURI());
1077         if (cauri.isLocation()) {
1078             // If the via is not the same as where we're being redirected (i.e.
1079             // we're not being redirected back to the same page, AND the
1080             // canonicalization of the via is equal to the the current cauri, 
1081             // THEN forcefetch (Forcefetch so no chance of our not crawling
1082             // content because alreadyseen check things its seen the url before.
1083             // An example of an URL that redirects to itself is:
1084             // http://bridalelegance.com/images/buttons3/tuxedos-off.gif.
1085             // An example of an URL whose canonicalization equals its via's
1086             // canonicalization, and we want to fetch content at the
1087             // redirection (i.e. need to set forcefetch), is netarkivet.dk.
1088             if (!cauri.toString().equals(cauri.getVia().toString()) &&
1089                     canonicalize(cauri.getVia()).equals(canon)) {
1090                 cauri.setForceFetch(true);
1091             }
1092         }
1093         return canon;
1094     }
1095 
1096     /***
1097      * @param cauri CrawlURI we're to get a key for.
1098      * @return a String token representing a queue
1099      */
1100     public String getClassKey(CandidateURI cauri) {
1101         String queueKey = (String)getUncheckedAttribute(cauri,
1102             ATTR_FORCE_QUEUE);
1103         if ("".equals(queueKey)) {
1104             // no forced override
1105             QueueAssignmentPolicy queueAssignmentPolicy = 
1106                 getQueueAssignmentPolicy(cauri);
1107             queueKey =
1108                 queueAssignmentPolicy.getClassKey(this.controller, cauri);
1109         }
1110         return queueKey;
1111     }
1112 
1113     protected QueueAssignmentPolicy getQueueAssignmentPolicy(CandidateURI cauri) {
1114         String clsName = (String)getUncheckedAttribute(cauri,
1115                 ATTR_QUEUE_ASSIGNMENT_POLICY);
1116         try {
1117             return (QueueAssignmentPolicy) Class.forName(clsName).newInstance();
1118         } catch (Exception e) {
1119             throw new RuntimeException(e);
1120         }
1121     }
1122 
1123     /***
1124      * @return RecoveryJournal instance.  May be null.
1125      */
1126     public FrontierJournal getFrontierJournal() {
1127         return this.recover;
1128     }
1129 
1130     public void crawlEnding(String sExitMessage) {
1131         // TODO Auto-generated method stub
1132     }
1133 
1134     public void crawlEnded(String sExitMessage) {
1135         if (logger.isLoggable(Level.INFO)) {
1136             logger.info("Closing with " + Long.toString(queuedUriCount()) +
1137                 " urls still in queue.");
1138         }
1139     }
1140 
1141     public void crawlStarted(String message) {
1142         // TODO Auto-generated method stub
1143     }
1144 
1145     public void crawlPausing(String statusMessage) {
1146         // TODO Auto-generated method stub
1147     }
1148 
1149     public void crawlPaused(String statusMessage) {
1150         // TODO Auto-generated method stub
1151     }
1152 
1153     public void crawlResuming(String statusMessage) {
1154         // TODO Auto-generated method stub
1155     }
1156     
1157     public void crawlCheckpoint(File checkpointDir)
1158     throws Exception {
1159         if (this.recover == null) {
1160             return;
1161         }
1162         this.recover.checkpoint(checkpointDir);
1163     }
1164     
1165     //
1166     // Reporter implementation
1167     // 
1168     public String singleLineReport() {
1169         return ArchiveUtils.singleLineReport(this);
1170     }
1171 
1172     public void reportTo(PrintWriter writer) {
1173         reportTo(null, writer);
1174     }
1175     
1176     //
1177     // maintain serialization compatibility to pre-AtomicLong impl
1178     private void writeObject(java.io.ObjectOutputStream out)
1179     throws IOException {
1180         queuedUriCount = liveQueuedUriCount.get();
1181         succeededFetchCount = liveSucceededFetchCount.get();
1182         failedFetchCount = liveFailedFetchCount.get();
1183         disregardedUriCount = liveDisregardedUriCount.get();
1184         out.defaultWriteObject();
1185     }
1186     private void readObject(java.io.ObjectInputStream in)
1187     throws IOException, ClassNotFoundException {
1188         in.defaultReadObject();
1189         liveQueuedUriCount = new AtomicLong(queuedUriCount);
1190         liveSucceededFetchCount = new AtomicLong(succeededFetchCount);
1191         liveFailedFetchCount = new AtomicLong(failedFetchCount);
1192         liveDisregardedUriCount = new AtomicLong(disregardedUriCount);
1193     }
1194 }