1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 package org.archive.crawler.frontier;
26
27 import java.io.BufferedWriter;
28 import java.io.File;
29 import java.io.FileOutputStream;
30 import java.io.IOException;
31 import java.io.OutputStreamWriter;
32 import java.io.PrintWriter;
33 import java.io.Serializable;
34 import java.io.StringWriter;
35 import java.io.Writer;
36 import java.util.Iterator;
37 import java.util.List;
38 import java.util.concurrent.atomic.AtomicLong;
39 import java.util.logging.Level;
40 import java.util.logging.Logger;
41 import java.util.regex.Pattern;
42
43 import javax.management.AttributeNotFoundException;
44
45 import org.apache.commons.httpclient.HttpStatus;
46 import org.archive.crawler.datamodel.CandidateURI;
47 import org.archive.crawler.datamodel.CoreAttributeConstants;
48 import org.archive.crawler.datamodel.CrawlHost;
49 import org.archive.crawler.datamodel.CrawlOrder;
50 import org.archive.crawler.datamodel.CrawlServer;
51 import org.archive.crawler.datamodel.CrawlSubstats;
52 import org.archive.crawler.datamodel.CrawlURI;
53 import org.archive.crawler.datamodel.FetchStatusCodes;
54 import org.archive.crawler.datamodel.RobotsExclusionPolicy;
55 import org.archive.crawler.datamodel.CrawlSubstats.Stage;
56 import org.archive.crawler.event.CrawlStatusListener;
57 import org.archive.crawler.framework.CrawlController;
58 import org.archive.crawler.framework.Frontier;
59 import org.archive.crawler.framework.ToeThread;
60 import org.archive.crawler.framework.exceptions.EndedException;
61 import org.archive.crawler.framework.exceptions.FatalConfigurationException;
62 import org.archive.crawler.settings.ModuleType;
63 import org.archive.crawler.settings.RegularExpressionConstraint;
64 import org.archive.crawler.settings.SimpleType;
65 import org.archive.crawler.settings.Type;
66 import org.archive.crawler.url.Canonicalizer;
67 import org.archive.net.UURI;
68 import org.archive.util.ArchiveUtils;
69
70 /***
71 * Shared facilities for Frontier implementations.
72 *
73 * @author gojomo
74 */
75 public abstract class AbstractFrontier extends ModuleType
76 implements CrawlStatusListener, Frontier, FetchStatusCodes,
77 CoreAttributeConstants, Serializable {
78 private static final long serialVersionUID = -4766504935003203930L;
79
80 private static final Logger logger = Logger
81 .getLogger(AbstractFrontier.class.getName());
82
83 protected transient CrawlController controller;
84
85 /*** ordinal numbers to assign to created CrawlURIs */
86 protected AtomicLong nextOrdinal = new AtomicLong(1);
87
88 /*** should the frontier hold any threads asking for URIs? */
89 protected boolean shouldPause = false;
90
91 /***
92 * should the frontier send an EndedException to any threads asking for
93 * URIs?
94 */
95 protected transient boolean shouldTerminate = false;
96
97 /***
98 * how many multiples of last fetch elapsed time to wait before recontacting
99 * same server
100 */
101 public final static String ATTR_DELAY_FACTOR = "delay-factor";
102
103 protected final static Float DEFAULT_DELAY_FACTOR = new Float(5);
104
105 /***
106 * always wait this long after one completion before recontacting same
107 * server, regardless of multiple
108 */
109 public final static String ATTR_MIN_DELAY = "min-delay-ms";
110
111
112 protected final static Integer DEFAULT_MIN_DELAY = new Integer(3000);
113
114 /***
115 * Whether to respect a 'Crawl-Delay' (in seconds) given in a site's
116 * robots.txt
117 */
118 public final static String
119 ATTR_RESPECT_CRAWL_DELAY_UP_TO_SECS = "respect-crawl-delay-up-to-secs";
120
121
122 protected final static Integer
123 DEFAULT_RESPECT_CRAWL_DELAY_UP_TO_SECS = 300;
124
125 /*** never wait more than this long, regardless of multiple */
126 public final static String ATTR_MAX_DELAY = "max-delay-ms";
127
128
129 protected final static Integer DEFAULT_MAX_DELAY = new Integer(30000);
130
131 /*** number of hops of embeds (ERX) to bump to front of host queue */
132 public final static String ATTR_PREFERENCE_EMBED_HOPS =
133 "preference-embed-hops";
134
135 protected final static Integer DEFAULT_PREFERENCE_EMBED_HOPS =
136 new Integer(1);
137
138 /*** maximum per-host bandwidth usage */
139 public final static String ATTR_MAX_HOST_BANDWIDTH_USAGE =
140 "max-per-host-bandwidth-usage-KB-sec";
141
142 protected final static Integer DEFAULT_MAX_HOST_BANDWIDTH_USAGE =
143 new Integer(0);
144
145 /*** maximum overall bandwidth usage */
146 public final static String ATTR_MAX_OVERALL_BANDWIDTH_USAGE =
147 "total-bandwidth-usage-KB-sec";
148
149 protected final static Integer DEFAULT_MAX_OVERALL_BANDWIDTH_USAGE =
150 new Integer(0);
151
152 /*** for retryable problems, seconds to wait before a retry */
153 public final static String ATTR_RETRY_DELAY = "retry-delay-seconds";
154
155
156 protected final static Long DEFAULT_RETRY_DELAY = new Long(900);
157
158 /*** maximum times to emit a CrawlURI without final disposition */
159 public final static String ATTR_MAX_RETRIES = "max-retries";
160
161 protected final static Integer DEFAULT_MAX_RETRIES = new Integer(30);
162
163 public final static String ATTR_QUEUE_ASSIGNMENT_POLICY =
164 "queue-assignment-policy";
165
166 /*** queue assignment to force onto CrawlURIs; intended to be overridden */
167 public final static String ATTR_FORCE_QUEUE = "force-queue-assignment";
168
169 protected final static String DEFAULT_FORCE_QUEUE = "";
170
171
172 protected final static String ACCEPTABLE_FORCE_QUEUE = "[-//w//.,:]*";
173
174 /*** whether pause, rather than finish, when crawl appears done */
175 public final static String ATTR_PAUSE_AT_FINISH = "pause-at-finish";
176
177 protected final static Boolean DEFAULT_PAUSE_AT_FINISH = Boolean.FALSE;
178
179 /*** whether to pause at crawl start */
180 public final static String ATTR_PAUSE_AT_START = "pause-at-start";
181 protected final static Boolean DEFAULT_PAUSE_AT_START = Boolean.FALSE;
182
183 /*** whether to pause at crawl start */
184 public final static String ATTR_SOURCE_TAG_SEEDS = "source-tag-seeds";
185 protected final static Boolean DEFAULT_SOURCE_TAG_SEEDS = Boolean.FALSE;
186
187 /***
188 * Recover log on or off attribute.
189 */
190 protected final static String ATTR_RECOVERY_ENABLED =
191 "recovery-log-enabled";
192 protected final static Boolean DEFAULT_ATTR_RECOVERY_ENABLED =
193 Boolean.TRUE;
194
195
196 protected long queuedUriCount;
197 protected long succeededFetchCount;
198 protected long failedFetchCount;
199 protected long disregardedUriCount;
200
201
202 /*** total URIs queued to be visited */
203 transient protected AtomicLong liveQueuedUriCount = new AtomicLong(0);
204
205 transient protected AtomicLong liveSucceededFetchCount = new AtomicLong(0);
206
207 transient protected AtomicLong liveFailedFetchCount = new AtomicLong(0);
208
209 /*** URIs that are disregarded (for example because of robot.txt rules */
210 transient protected AtomicLong liveDisregardedUriCount = new AtomicLong(0);
211
212 /***
213 * Used when bandwidth constraint are used.
214 */
215 protected long totalProcessedBytes = 0;
216
217 private transient long nextURIEmitTime = 0;
218
219 protected long processedBytesAfterLastEmittedURI = 0;
220
221 protected int lastMaxBandwidthKB = 0;
222
223 /***
224 * Crawl replay logger.
225 *
226 * Currently captures Frontier/URI transitions.
227 * Can be null if user chose not to run a recovery.log.
228 */
229 private transient FrontierJournal recover = null;
230
231 /*** file collecting report of ignored seed-file entries (if any) */
232 public static final String IGNORED_SEEDS_FILENAME = "seeds.ignored";
233
234 /***
235 * @param name Name of this frontier.
236 * @param description Description for this frontier.
237 */
238 public AbstractFrontier(String name, String description) {
239 super(name, description);
240 addElementToDefinition(new SimpleType(ATTR_DELAY_FACTOR,
241 "How many multiples of last fetch elapsed time to wait before "
242 + "recontacting same server", DEFAULT_DELAY_FACTOR));
243 addElementToDefinition(new SimpleType(ATTR_MAX_DELAY,
244 "Never wait more than this long.", DEFAULT_MAX_DELAY));
245 addElementToDefinition(new SimpleType(ATTR_MIN_DELAY,
246 "Always wait this long after one completion before recontacting "
247 + "same server.", DEFAULT_MIN_DELAY));
248 addElementToDefinition(new SimpleType(ATTR_RESPECT_CRAWL_DELAY_UP_TO_SECS,
249 "Respect a Crawl-Delay directive in a site's robots.txt "
250 +"up to this value in seconds. (If longer, simply "
251 +"respect this value.) Default is 300 seconds (5 minutes).",
252 DEFAULT_RESPECT_CRAWL_DELAY_UP_TO_SECS));
253 addElementToDefinition(new SimpleType(ATTR_MAX_RETRIES,
254 "How often to retry fetching a URI that failed to be retrieved. "
255 + "If zero, the crawler will get the robots.txt only.",
256 DEFAULT_MAX_RETRIES));
257 addElementToDefinition(new SimpleType(ATTR_RETRY_DELAY,
258 "How long to wait by default until we retry fetching a"
259 + " URI that failed to be retrieved (seconds). ",
260 DEFAULT_RETRY_DELAY));
261 addElementToDefinition(new SimpleType(
262 ATTR_PREFERENCE_EMBED_HOPS,
263 "Number of embedded (or redirected) hops up to which "
264 + "a URI has higher priority scheduling. For example, if set "
265 + "to 1 (the default), items such as inline images (1-hop "
266 + "embedded resources) will be scheduled ahead of all regular "
267 + "links (or many-hop resources, like nested frames). If set to "
268 + "zero, no preferencing will occur, and embeds/redirects are "
269 + "scheduled the same as regular links.",
270 DEFAULT_PREFERENCE_EMBED_HOPS));
271 Type t;
272 t = addElementToDefinition(new SimpleType(
273 ATTR_MAX_OVERALL_BANDWIDTH_USAGE,
274 "The maximum average bandwidth the crawler is allowed to use. "
275 + "The actual read speed is not affected by this setting, it only "
276 + "holds back new URIs from being processed when the bandwidth "
277 + "usage has been to high. 0 means no bandwidth limitation.",
278 DEFAULT_MAX_OVERALL_BANDWIDTH_USAGE));
279 t.setOverrideable(false);
280 t = addElementToDefinition(new SimpleType(
281 ATTR_MAX_HOST_BANDWIDTH_USAGE,
282 "The maximum average bandwidth the crawler is allowed to use per "
283 + "host. The actual read speed is not affected by this setting, "
284 + "it only holds back new URIs from being processed when the "
285 + "bandwidth usage has been to high. 0 means no bandwidth "
286 + "limitation.", DEFAULT_MAX_HOST_BANDWIDTH_USAGE));
287 t.setExpertSetting(true);
288
289
290
291 String queueStr = System.getProperty(AbstractFrontier.class.getName() +
292 "." + ATTR_QUEUE_ASSIGNMENT_POLICY,
293 HostnameQueueAssignmentPolicy.class.getName() + " " +
294 IPQueueAssignmentPolicy.class.getName() + " " +
295 BucketQueueAssignmentPolicy.class.getName() + " " +
296 SurtAuthorityQueueAssignmentPolicy.class.getName() + " " +
297 TopmostAssignedSurtQueueAssignmentPolicy.class.getName());
298 Pattern p = Pattern.compile("//s*,//s*|//s+");
299 String [] queues = p.split(queueStr);
300 if (queues.length <= 0) {
301 throw new RuntimeException("Failed parse of " +
302 " assignment queue policy string: " + queueStr);
303 }
304 t = addElementToDefinition(new SimpleType(ATTR_QUEUE_ASSIGNMENT_POLICY,
305 "Defines how to assign URIs to queues. Can assign by host, " +
306 "by ip, and into one of a fixed set of buckets (1k).",
307 queues[0], queues));
308 t.setExpertSetting(true);
309 t.setOverrideable(true);
310
311 t = addElementToDefinition(new SimpleType(
312 ATTR_FORCE_QUEUE,
313 "The queue name into which to force URIs. Should "
314 + "be left blank at global level. Specify a "
315 + "per-domain/per-host override to force URIs into "
316 + "a particular named queue, regardless of the assignment "
317 + "policy in effect (domain or ip-based politeness). "
318 + "This could be used on domains known to all be from "
319 + "the same small set of IPs (eg blogspot, dailykos, etc.) "
320 + "to simulate IP-based politeness, or it could be used if "
321 + "you wanted to enforce politeness over a whole domain, even "
322 + "though the subdomains are split across many IPs.",
323 DEFAULT_FORCE_QUEUE));
324 t.setOverrideable(true);
325 t.setExpertSetting(true);
326 t.addConstraint(new RegularExpressionConstraint(ACCEPTABLE_FORCE_QUEUE,
327 Level.WARNING, "This field must contain only alphanumeric "
328 + "characters plus period, dash, comma, colon, or underscore."));
329 t = addElementToDefinition(new SimpleType(
330 ATTR_PAUSE_AT_START,
331 "Whether to pause when the crawl begins, before any URIs " +
332 "are tried. This gives the operator a chance to verify or " +
333 "adjust the crawl before actual work begins. " +
334 "Default is false.", DEFAULT_PAUSE_AT_START));
335 t = addElementToDefinition(new SimpleType(
336 ATTR_PAUSE_AT_FINISH,
337 "Whether to pause when the crawl appears finished, rather "
338 + "than immediately end the crawl. This gives the operator an "
339 + "opportunity to view crawl results, and possibly add URIs or "
340 + "adjust settings, while the crawl state is still available. "
341 + "Default is false.", DEFAULT_PAUSE_AT_FINISH));
342 t.setOverrideable(false);
343
344 t = addElementToDefinition(new SimpleType(
345 ATTR_SOURCE_TAG_SEEDS,
346 "Whether to tag seeds with their own URI as a heritable " +
347 "'source' String, which will be carried-forward to all URIs " +
348 "discovered on paths originating from that seed. When " +
349 "present, such source tags appear in the second-to-last " +
350 "crawl.log field.", DEFAULT_SOURCE_TAG_SEEDS));
351 t.setOverrideable(false);
352
353 t = addElementToDefinition(new SimpleType(ATTR_RECOVERY_ENABLED,
354 "Set to false to disable recovery log writing. Do this if " +
355 "you you are using the checkpoint feature for recovering " +
356 "crashed crawls.", DEFAULT_ATTR_RECOVERY_ENABLED));
357 t.setExpertSetting(true);
358
359 t.setOverrideable(false);
360 }
361
362 public void start() {
363 if (((Boolean)getUncheckedAttribute(null, ATTR_PAUSE_AT_START))
364 .booleanValue()) {
365
366 controller.requestCrawlPause();
367 } else {
368
369 unpause();
370 }
371 }
372
373 synchronized public void pause() {
374 shouldPause = true;
375 }
376
377 synchronized public void unpause() {
378 shouldPause = false;
379 notifyAll();
380 }
381
382 public void initialize(CrawlController c)
383 throws FatalConfigurationException, IOException {
384 c.addCrawlStatusListener(this);
385 File logsDisk = null;
386 try {
387 logsDisk = c.getSettingsDir(CrawlOrder.ATTR_LOGS_PATH);
388 } catch (AttributeNotFoundException e) {
389 logger.log(Level.SEVERE, "Failed to get logs directory", e);
390 }
391 if (logsDisk != null) {
392 String logsPath = logsDisk.getAbsolutePath() + File.separatorChar;
393 if (((Boolean)getUncheckedAttribute(null, ATTR_RECOVERY_ENABLED))
394 .booleanValue()) {
395 this.recover = new RecoveryJournal(logsPath,
396 FrontierJournal.LOGNAME_RECOVER);
397 }
398 }
399
400
401
402
403
404
405
406
407
408
409 }
410
411 synchronized public void terminate() {
412 shouldTerminate = true;
413 if (this.recover != null) {
414 this.recover.close();
415 this.recover = null;
416 }
417 unpause();
418 }
419
420 /***
421 * Report CrawlURI to each of the three 'substats' accumulators
422 * (group/queue, server, host) for a given stage.
423 *
424 * @param curi
425 * @param stage
426 */
427 protected void tally(CrawlURI curi, Stage stage) {
428
429 CrawlServer server =
430 controller.getServerCache().getServerFor(curi);
431 if (server != null) {
432 server.getSubstats().tally(curi, stage);
433 }
434 CrawlHost host =
435 controller.getServerCache().getHostFor(curi);
436 if (host != null) {
437 host.getSubstats().tally(curi, stage);
438 }
439 FrontierGroup group =
440 controller.getFrontier().getGroup(curi);
441 group.getSubstats().tally(curi, stage);
442 }
443
444 protected void doJournalFinishedSuccess(CrawlURI c) {
445 tally(c,CrawlSubstats.Stage.SUCCEEDED);
446 if (this.recover != null) {
447 this.recover.finishedSuccess(c);
448 }
449 }
450
451 protected void doJournalAdded(CrawlURI c) {
452 tally(c,CrawlSubstats.Stage.SCHEDULED);
453 if (this.recover != null) {
454 this.recover.added(c);
455 }
456 }
457
458 protected void doJournalRescheduled(CrawlURI c) {
459 tally(c,CrawlSubstats.Stage.RETRIED);
460 if (this.recover != null) {
461 this.recover.rescheduled(c);
462 }
463 }
464
465 protected void doJournalFinishedFailure(CrawlURI c) {
466 tally(c,CrawlSubstats.Stage.FAILED);
467 if (this.recover != null) {
468 this.recover.finishedFailure(c);
469 }
470 }
471
472 protected void doJournalDisregarded(CrawlURI c) {
473 tally(c,CrawlSubstats.Stage.DISREGARDED);
474 if (this.recover != null) {
475 this.recover.finishedDisregard(c);
476 }
477 }
478
479 protected void doJournalEmitted(CrawlURI c) {
480 if (this.recover != null) {
481 this.recover.emitted(c);
482 }
483 }
484
485 /***
486 * Frontier is empty only if all queues are empty and no URIs are in-process
487 *
488 * @return True if queues are empty.
489 */
490 public boolean isEmpty() {
491 return liveQueuedUriCount.get() == 0;
492 }
493
494 /***
495 * Increment the running count of queued URIs.
496 */
497 protected void incrementQueuedUriCount() {
498 liveQueuedUriCount.incrementAndGet();
499 }
500
501 /***
502 * Increment the running count of queued URIs. Synchronized because
503 * operations on longs are not atomic.
504 *
505 * @param increment
506 * amount to increment the queued count
507 */
508 protected void incrementQueuedUriCount(long increment) {
509 liveQueuedUriCount.addAndGet(increment);
510 }
511
512 /***
513 * Note that a number of queued Uris have been deleted.
514 *
515 * @param numberOfDeletes
516 */
517 protected void decrementQueuedCount(long numberOfDeletes) {
518 liveQueuedUriCount.addAndGet(-numberOfDeletes);
519 }
520
521 /***
522 * (non-Javadoc)
523 *
524 * @see org.archive.crawler.framework.Frontier#queuedUriCount()
525 */
526 public long queuedUriCount() {
527 return liveQueuedUriCount.get();
528 }
529
530 /***
531 * (non-Javadoc)
532 *
533 * @see org.archive.crawler.framework.Frontier#finishedUriCount()
534 */
535 public long finishedUriCount() {
536 return liveSucceededFetchCount.get() + liveFailedFetchCount.get() + liveDisregardedUriCount.get();
537 }
538
539 /***
540 * Increment the running count of successfully fetched URIs.
541 */
542 protected void incrementSucceededFetchCount() {
543 liveSucceededFetchCount.incrementAndGet();
544 }
545
546 /***
547 * (non-Javadoc)
548 *
549 * @see org.archive.crawler.framework.Frontier#succeededFetchCount()
550 */
551 public long succeededFetchCount() {
552 return liveSucceededFetchCount.get();
553 }
554
555 /***
556 * Increment the running count of failed URIs.
557 */
558 protected void incrementFailedFetchCount() {
559 liveFailedFetchCount.incrementAndGet();
560 }
561
562 /***
563 * (non-Javadoc)
564 *
565 * @see org.archive.crawler.framework.Frontier#failedFetchCount()
566 */
567 public long failedFetchCount() {
568 return liveFailedFetchCount.get();
569 }
570
571 /***
572 * Increment the running count of disregarded URIs. Synchronized because
573 * operations on longs are not atomic.
574 */
575 protected void incrementDisregardedUriCount() {
576 liveDisregardedUriCount.incrementAndGet();
577 }
578
579 public long disregardedUriCount() {
580 return liveDisregardedUriCount.get();
581 }
582
583 /*** @deprecated misnomer; use StatisticsTracking figures instead */
584 public long totalBytesWritten() {
585 return totalProcessedBytes;
586 }
587
588 /***
589 * Load up the seeds.
590 *
591 * This method is called on initialize and inside in the crawlcontroller
592 * when it wants to force reloading of configuration.
593 *
594 * @see org.archive.crawler.framework.CrawlController#kickUpdate()
595 */
596 public void loadSeeds() {
597 Writer ignoredWriter = new StringWriter();
598 logger.info("beginning");
599
600 Iterator iter = this.controller.getScope().seedsIterator(ignoredWriter);
601 int count = 0;
602 while (iter.hasNext()) {
603 UURI u = (UURI)iter.next();
604 CandidateURI caUri = CandidateURI.createSeedCandidateURI(u);
605 caUri.setSchedulingDirective(CandidateURI.MEDIUM);
606 if (((Boolean)getUncheckedAttribute(null, ATTR_SOURCE_TAG_SEEDS))
607 .booleanValue()) {
608 caUri.putString(CoreAttributeConstants.A_SOURCE_TAG,caUri.toString());
609 caUri.makeHeritable(CoreAttributeConstants.A_SOURCE_TAG);
610 }
611 schedule(caUri);
612 count++;
613 if(count%1000==0) {
614 logger.info(count+" seeds");
615 }
616 }
617
618 saveIgnoredItems(ignoredWriter.toString(), controller.getDisk());
619 logger.info("finished");
620 }
621
622 /***
623 * Dump ignored seed items (if any) to disk; delete file otherwise.
624 * Static to allow non-derived sibling classes (frontiers not yet
625 * subclassed here) to reuse.
626 *
627 * @param ignoredItems
628 * @param dir
629 */
630 public static void saveIgnoredItems(String ignoredItems, File dir) {
631 File ignoredFile = new File(dir, IGNORED_SEEDS_FILENAME);
632 if(ignoredItems==null | ignoredItems.length()>0) {
633 try {
634 BufferedWriter bw = new BufferedWriter(
635 new OutputStreamWriter(new FileOutputStream(ignoredFile),"UTF-8"));
636 bw.write(ignoredItems);
637 bw.close();
638 } catch (IOException e) {
639
640 e.printStackTrace();
641 }
642 } else {
643
644 ignoredFile.delete();
645 }
646 }
647
648 protected CrawlURI asCrawlUri(CandidateURI caUri) {
649 CrawlURI curi;
650 if (caUri instanceof CrawlURI) {
651 curi = (CrawlURI)caUri;
652 } else {
653 curi = CrawlURI.from(caUri, nextOrdinal.getAndIncrement());
654 }
655 curi.setClassKey(getClassKey(curi));
656 return curi;
657 }
658
659 /***
660 * @param now
661 * @throws InterruptedException
662 * @throws EndedException
663 */
664 protected synchronized void preNext(long now) throws InterruptedException,
665 EndedException {
666 if (this.controller == null) {
667 return;
668 }
669
670
671 if (this.controller.atFinish()) {
672 if (((Boolean)getUncheckedAttribute(null, ATTR_PAUSE_AT_FINISH))
673 .booleanValue()) {
674 this.controller.requestCrawlPause();
675 } else {
676 this.controller.beginCrawlStop();
677 }
678 }
679
680
681 if (shouldPause) {
682 while (shouldPause) {
683 this.controller.toePaused();
684 wait();
685 }
686
687 if (controller != null && controller.atFinish()) {
688 this.controller.beginCrawlStop();
689 }
690 }
691
692
693 if (shouldTerminate
694 || ((ToeThread)Thread.currentThread()).shouldRetire()) {
695 throw new EndedException("terminated");
696 }
697
698 enforceBandwidthThrottle(now);
699 }
700
701 /***
702 * Perform any special handling of the CrawlURI, such as promoting its URI
703 * to seed-status, or preferencing it because it is an embed.
704 *
705 * @param curi
706 */
707 protected void applySpecialHandling(CrawlURI curi) {
708 if (curi.isSeed() && curi.getVia() != null
709 && curi.flattenVia().length() > 0) {
710
711
712
713
714
715
716 this.controller.getScope().addSeed(curi);
717
718 if (curi.getSchedulingDirective() == CandidateURI.NORMAL) {
719 curi.setSchedulingDirective(CandidateURI.MEDIUM);
720 }
721 }
722
723
724 int prefHops = ((Integer)getUncheckedAttribute(curi,
725 ATTR_PREFERENCE_EMBED_HOPS)).intValue();
726 if (prefHops > 0) {
727 int embedHops = curi.getTransHops();
728 if (embedHops > 0 && embedHops <= prefHops
729 && curi.getSchedulingDirective() == CandidateURI.NORMAL) {
730
731
732 curi.setSchedulingDirective(CandidateURI.MEDIUM);
733 }
734 }
735 }
736
737 /***
738 * Perform fixups on a CrawlURI about to be returned via next().
739 *
740 * @param curi
741 * CrawlURI about to be returned by next()
742 * @param q
743 * the queue from which the CrawlURI came
744 */
745 protected void noteAboutToEmit(CrawlURI curi, WorkQueue q) {
746 curi.setHolder(q);
747
748
749
750
751 doJournalEmitted(curi);
752 }
753
754 /***
755 * @param curi
756 * @return the CrawlServer to be associated with this CrawlURI
757 */
758 protected CrawlServer getServer(CrawlURI curi) {
759 return this.controller.getServerCache().getServerFor(curi);
760 }
761
762 /***
763 * Return a suitable value to wait before retrying the given URI.
764 *
765 * @param curi
766 * CrawlURI to be retried
767 * @return millisecond delay before retry
768 */
769 protected long retryDelayFor(CrawlURI curi) {
770 int status = curi.getFetchStatus();
771 return (status == S_CONNECT_FAILED || status == S_CONNECT_LOST ||
772 status == S_DOMAIN_UNRESOLVABLE)?
773 ((Long)getUncheckedAttribute(curi, ATTR_RETRY_DELAY)).longValue():
774 0;
775 }
776
777 /***
778 * Update any scheduling structures with the new information in this
779 * CrawlURI. Chiefly means make necessary arrangements for no other URIs at
780 * the same host to be visited within the appropriate politeness window.
781 *
782 * @param curi
783 * The CrawlURI
784 * @return millisecond politeness delay
785 */
786 protected long politenessDelayFor(CrawlURI curi) {
787 long durationToWait = 0;
788 if (curi.containsKey(A_FETCH_BEGAN_TIME)
789 && curi.containsKey(A_FETCH_COMPLETED_TIME)) {
790
791 long completeTime = curi.getLong(A_FETCH_COMPLETED_TIME);
792 long durationTaken = (completeTime - curi
793 .getLong(A_FETCH_BEGAN_TIME));
794 durationToWait = (long)(((Float)getUncheckedAttribute(curi,
795 ATTR_DELAY_FACTOR)).floatValue() * durationTaken);
796
797 long minDelay = ((Integer)getUncheckedAttribute(curi,
798 ATTR_MIN_DELAY)).longValue();
799
800 if (minDelay > durationToWait) {
801
802 durationToWait = minDelay;
803 }
804
805 long maxDelay = ((Integer)getUncheckedAttribute(curi,
806 ATTR_MAX_DELAY)).longValue();
807 if (durationToWait > maxDelay) {
808
809 durationToWait = maxDelay;
810 }
811
812 long respectThreshold = ((Integer)getUncheckedAttribute(curi,
813 ATTR_RESPECT_CRAWL_DELAY_UP_TO_SECS)).longValue()*1000;
814
815 if(durationToWait<respectThreshold) {
816
817 CrawlServer s = controller.getServerCache().getServerFor(curi);
818 String ua = curi.getUserAgent();
819 if(ua==null) {
820 ua = controller.getOrder().getUserAgent(curi);
821 }
822 RobotsExclusionPolicy rep = s.getRobots();
823 if (rep!=null) {
824 long crawlDelay = (long)(1000 * s.getRobots().getCrawlDelay(ua));
825 crawlDelay =
826 (crawlDelay > respectThreshold)
827 ? respectThreshold
828 : crawlDelay;
829 if (crawlDelay > durationToWait) {
830
831 durationToWait = crawlDelay;
832 }
833 }
834 }
835
836 long now = System.currentTimeMillis();
837 int maxBandwidthKB = ((Integer)getUncheckedAttribute(curi,
838 ATTR_MAX_HOST_BANDWIDTH_USAGE)).intValue();
839 if (maxBandwidthKB > 0) {
840
841 CrawlHost host = controller.getServerCache().getHostFor(curi);
842 long minDurationToWait = host.getEarliestNextURIEmitTime()
843 - now;
844 float maxBandwidth = maxBandwidthKB * 1.024F;
845 long processedBytes = curi.getContentSize();
846 host
847 .setEarliestNextURIEmitTime((long)(processedBytes / maxBandwidth)
848 + now);
849
850 if (minDurationToWait > durationToWait) {
851 durationToWait = minDurationToWait;
852 }
853 }
854 }
855 return durationToWait;
856 }
857
858 /***
859 * Ensure that any overall-bandwidth-usage limit is respected, by pausing as
860 * long as necessary.
861 *
862 * @param now
863 * @throws InterruptedException
864 */
865 private void enforceBandwidthThrottle(long now) throws InterruptedException {
866 int maxBandwidthKB = ((Integer)getUncheckedAttribute(null,
867 ATTR_MAX_OVERALL_BANDWIDTH_USAGE)).intValue();
868 if (maxBandwidthKB > 0) {
869
870 if (maxBandwidthKB != lastMaxBandwidthKB) {
871 lastMaxBandwidthKB = maxBandwidthKB;
872 processedBytesAfterLastEmittedURI = totalProcessedBytes;
873 }
874
875
876 long sleepTime = nextURIEmitTime - now;
877 float maxBandwidth = maxBandwidthKB * 1.024F;
878 long processedBytes = totalProcessedBytes
879 - processedBytesAfterLastEmittedURI;
880 long shouldHaveEmittedDiff = nextURIEmitTime == 0? 0
881 : nextURIEmitTime - now;
882 nextURIEmitTime = (long)(processedBytes / maxBandwidth) + now
883 + shouldHaveEmittedDiff;
884 processedBytesAfterLastEmittedURI = totalProcessedBytes;
885 if (sleepTime > 0) {
886 long targetTime = now + sleepTime;
887 now = System.currentTimeMillis();
888 while (now < targetTime) {
889 synchronized (this) {
890 if (logger.isLoggable(Level.FINE)) {
891 logger.fine("Frontier waits for: " + sleepTime
892 + "ms to respect bandwidth limit.");
893 }
894
895
896
897
898
899 wait(targetTime - now);
900 }
901 now = System.currentTimeMillis();
902 }
903 }
904 }
905 }
906
907 /***
908 * Take note of any processor-local errors that have been entered into the
909 * CrawlURI.
910 *
911 * @param curi
912 *
913 */
914 protected void logLocalizedErrors(CrawlURI curi) {
915 if (curi.containsKey(A_LOCALIZED_ERRORS)) {
916 List localErrors = (List)curi.getObject(A_LOCALIZED_ERRORS);
917 Iterator iter = localErrors.iterator();
918 while (iter.hasNext()) {
919 Object array[] = {curi, iter.next()};
920 controller.localErrors.log(Level.WARNING, curi.getUURI()
921 .toString(), array);
922 }
923
924 curi.remove(A_LOCALIZED_ERRORS);
925 }
926 }
927
928 /***
929 * Utility method to return a scratch dir for the given key's temp files.
930 * Every key gets its own subdir. To avoid having any one directory with
931 * thousands of files, there are also two levels of enclosing directory
932 * named by the least-significant hex digits of the key string's java
933 * hashcode.
934 *
935 * @param key
936 * @return File representing scratch directory
937 */
938 protected File scratchDirFor(String key) {
939 String hex = Integer.toHexString(key.hashCode());
940 while (hex.length() < 4) {
941 hex = "0" + hex;
942 }
943 int len = hex.length();
944 return new File(this.controller.getStateDisk(), hex.substring(len - 2,
945 len)
946 + File.separator
947 + hex.substring(len - 4, len - 2)
948 + File.separator + key);
949 }
950
951 protected boolean overMaxRetries(CrawlURI curi) {
952
953 if (curi.getFetchAttempts() >= ((Integer)getUncheckedAttribute(curi,
954 ATTR_MAX_RETRIES)).intValue()) {
955 return true;
956 }
957 return false;
958 }
959
960 public void importRecoverLog(String pathToLog, boolean retainFailures)
961 throws IOException {
962 File source = new File(pathToLog);
963 if (!source.isAbsolute()) {
964 source = new File(getSettingsHandler().getOrder().getController()
965 .getDisk(), pathToLog);
966 }
967 RecoveryJournal.importRecoverLog(source, controller, retainFailures);
968 }
969
970
971
972
973
974
975 public void kickUpdate() {
976
977
978 }
979
980 /***
981 * Log to the main crawl.log
982 *
983 * @param curi
984 */
985 protected void log(CrawlURI curi) {
986 curi.aboutToLog();
987 Object array[] = {curi};
988 this.controller.uriProcessing.log(Level.INFO,
989 curi.getUURI().toString(), array);
990 }
991
992 protected boolean isDisregarded(CrawlURI curi) {
993 switch (curi.getFetchStatus()) {
994 case S_ROBOTS_PRECLUDED:
995 case S_BLOCKED_BY_CUSTOM_PROCESSOR:
996 case S_OUT_OF_SCOPE:
997 case S_BLOCKED_BY_USER:
998 case S_TOO_MANY_EMBED_HOPS:
999 case S_TOO_MANY_LINK_HOPS:
1000 case S_DELETED_BY_USER:
1001 return true;
1002 default:
1003 return false;
1004 }
1005 }
1006
1007 /***
1008 * Checks if a recently completed CrawlURI that did not finish successfully
1009 * needs to be retried (processed again after some time elapses)
1010 *
1011 * @param curi
1012 * The CrawlURI to check
1013 * @return True if we need to retry.
1014 */
1015 protected boolean needsRetrying(CrawlURI curi) {
1016 if (overMaxRetries(curi)) {
1017 return false;
1018 }
1019
1020 switch (curi.getFetchStatus()) {
1021 case HttpStatus.SC_UNAUTHORIZED:
1022
1023
1024
1025
1026
1027
1028 boolean loaded = curi.hasRfc2617CredentialAvatar();
1029 if (!loaded && logger.isLoggable(Level.INFO)) {
1030 logger.info("Have 401 but no creds loaded " + curi);
1031 }
1032 return loaded;
1033 case S_DEFERRED:
1034 case S_CONNECT_FAILED:
1035 case S_CONNECT_LOST:
1036 case S_DOMAIN_UNRESOLVABLE:
1037
1038
1039
1040 return true;
1041 default:
1042 return false;
1043 }
1044 }
1045
1046 /***
1047 * Canonicalize passed uuri. Its would be sweeter if this canonicalize
1048 * function was encapsulated by that which it canonicalizes but because
1049 * settings change with context -- i.e. there may be overrides in operation
1050 * for a particular URI -- its not so easy; Each CandidateURI would need a
1051 * reference to the settings system. That's awkward to pass in.
1052 *
1053 * @param uuri Candidate URI to canonicalize.
1054 * @return Canonicalized version of passed <code>uuri</code>.
1055 */
1056 protected String canonicalize(UURI uuri) {
1057 return Canonicalizer.canonicalize(uuri, this.controller.getOrder());
1058 }
1059
1060 /***
1061 * Canonicalize passed CandidateURI. This method differs from
1062 * {@link #canonicalize(UURI)} in that it takes a look at
1063 * the CandidateURI context possibly overriding any canonicalization effect if
1064 * it could make us miss content. If canonicalization produces an URL that
1065 * was 'alreadyseen', but the entry in the 'alreadyseen' database did
1066 * nothing but redirect to the current URL, we won't get the current URL;
1067 * we'll think we've already see it. Examples would be archive.org
1068 * redirecting to www.archive.org or the inverse, www.netarkivet.net
1069 * redirecting to netarkivet.net (assuming stripWWW rule enabled).
1070 * <p>Note, this method under circumstance sets the forceFetch flag.
1071 *
1072 * @param cauri CandidateURI to examine.
1073 * @return Canonicalized <code>cacuri</code>.
1074 */
1075 protected String canonicalize(CandidateURI cauri) {
1076 String canon = canonicalize(cauri.getUURI());
1077 if (cauri.isLocation()) {
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088 if (!cauri.toString().equals(cauri.getVia().toString()) &&
1089 canonicalize(cauri.getVia()).equals(canon)) {
1090 cauri.setForceFetch(true);
1091 }
1092 }
1093 return canon;
1094 }
1095
1096 /***
1097 * @param cauri CrawlURI we're to get a key for.
1098 * @return a String token representing a queue
1099 */
1100 public String getClassKey(CandidateURI cauri) {
1101 String queueKey = (String)getUncheckedAttribute(cauri,
1102 ATTR_FORCE_QUEUE);
1103 if ("".equals(queueKey)) {
1104
1105 QueueAssignmentPolicy queueAssignmentPolicy =
1106 getQueueAssignmentPolicy(cauri);
1107 queueKey =
1108 queueAssignmentPolicy.getClassKey(this.controller, cauri);
1109 }
1110 return queueKey;
1111 }
1112
1113 protected QueueAssignmentPolicy getQueueAssignmentPolicy(CandidateURI cauri) {
1114 String clsName = (String)getUncheckedAttribute(cauri,
1115 ATTR_QUEUE_ASSIGNMENT_POLICY);
1116 try {
1117 return (QueueAssignmentPolicy) Class.forName(clsName).newInstance();
1118 } catch (Exception e) {
1119 throw new RuntimeException(e);
1120 }
1121 }
1122
1123 /***
1124 * @return RecoveryJournal instance. May be null.
1125 */
1126 public FrontierJournal getFrontierJournal() {
1127 return this.recover;
1128 }
1129
1130 public void crawlEnding(String sExitMessage) {
1131
1132 }
1133
1134 public void crawlEnded(String sExitMessage) {
1135 if (logger.isLoggable(Level.INFO)) {
1136 logger.info("Closing with " + Long.toString(queuedUriCount()) +
1137 " urls still in queue.");
1138 }
1139 }
1140
1141 public void crawlStarted(String message) {
1142
1143 }
1144
1145 public void crawlPausing(String statusMessage) {
1146
1147 }
1148
1149 public void crawlPaused(String statusMessage) {
1150
1151 }
1152
1153 public void crawlResuming(String statusMessage) {
1154
1155 }
1156
1157 public void crawlCheckpoint(File checkpointDir)
1158 throws Exception {
1159 if (this.recover == null) {
1160 return;
1161 }
1162 this.recover.checkpoint(checkpointDir);
1163 }
1164
1165
1166
1167
1168 public String singleLineReport() {
1169 return ArchiveUtils.singleLineReport(this);
1170 }
1171
1172 public void reportTo(PrintWriter writer) {
1173 reportTo(null, writer);
1174 }
1175
1176
1177
1178 private void writeObject(java.io.ObjectOutputStream out)
1179 throws IOException {
1180 queuedUriCount = liveQueuedUriCount.get();
1181 succeededFetchCount = liveSucceededFetchCount.get();
1182 failedFetchCount = liveFailedFetchCount.get();
1183 disregardedUriCount = liveDisregardedUriCount.get();
1184 out.defaultWriteObject();
1185 }
1186 private void readObject(java.io.ObjectInputStream in)
1187 throws IOException, ClassNotFoundException {
1188 in.defaultReadObject();
1189 liveQueuedUriCount = new AtomicLong(queuedUriCount);
1190 liveSucceededFetchCount = new AtomicLong(succeededFetchCount);
1191 liveFailedFetchCount = new AtomicLong(failedFetchCount);
1192 liveDisregardedUriCount = new AtomicLong(disregardedUriCount);
1193 }
1194 }