1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 package org.archive.crawler.frontier;
24
25 import java.io.File;
26 import java.io.IOException;
27 import java.io.PrintWriter;
28 import java.io.Serializable;
29 import java.io.StringWriter;
30 import java.io.Writer;
31 import java.util.ArrayList;
32 import java.util.Date;
33 import java.util.Iterator;
34 import java.util.List;
35 import java.util.logging.Level;
36 import java.util.logging.Logger;
37 import java.util.regex.Pattern;
38
39 import javax.management.AttributeNotFoundException;
40
41 import org.apache.commons.httpclient.HttpStatus;
42 import org.archive.crawler.datamodel.CandidateURI;
43 import org.archive.crawler.datamodel.CoreAttributeConstants;
44 import org.archive.crawler.datamodel.CrawlServer;
45 import org.archive.crawler.datamodel.CrawlURI;
46 import org.archive.crawler.datamodel.FetchStatusCodes;
47 import org.archive.crawler.datamodel.UriUniqFilter;
48 import org.archive.crawler.datamodel.UriUniqFilter.HasUriReceiver;
49 import org.archive.crawler.event.CrawlStatusListener;
50 import org.archive.crawler.framework.CrawlController;
51 import org.archive.crawler.framework.Frontier;
52 import org.archive.crawler.framework.FrontierMarker;
53 import org.archive.crawler.framework.ToeThread;
54 import org.archive.crawler.framework.exceptions.EndedException;
55 import org.archive.crawler.framework.exceptions.FatalConfigurationException;
56 import org.archive.crawler.framework.exceptions.InvalidFrontierMarkerException;
57 import org.archive.crawler.settings.ModuleType;
58 import org.archive.crawler.settings.RegularExpressionConstraint;
59 import org.archive.crawler.settings.SimpleType;
60 import org.archive.crawler.settings.Type;
61 import org.archive.crawler.url.Canonicalizer;
62 import org.archive.crawler.util.BdbUriUniqFilter;
63 import org.archive.net.UURI;
64 import org.archive.queue.MemQueue;
65 import org.archive.queue.Queue;
66 import org.archive.util.ArchiveUtils;
67
68
69 /***
70 * A Frontier that will repeatedly visit all encountered URIs.
71 * <p>
72 * Wait time between visits is configurable and varies based on observed
73 * changes of documents.
74 * <p>
75 * The Frontier borrows many things from HostQueuesFrontier, but implements
76 * an entirely different strategy in issuing URIs and consequently in keeping a
77 * record of discovered URIs.
78 *
79 * @author Kristinn Sigurdsson
80 */
81 public class AdaptiveRevisitFrontier extends ModuleType
82 implements Frontier, FetchStatusCodes, CoreAttributeConstants,
83 AdaptiveRevisitAttributeConstants, CrawlStatusListener, HasUriReceiver {
84
85 private static final long serialVersionUID = -8666872690438543671L;
86
87 private static final Logger logger =
88 Logger.getLogger(AdaptiveRevisitFrontier.class.getName());
89
90 /*** How many multiples of last fetch elapsed time to wait before recontacting
91 * same server */
92 public final static String ATTR_DELAY_FACTOR = "delay-factor";
93 private final static Float DEFAULT_DELAY_FACTOR = new Float(5);
94
95 /*** Always wait this long after one completion before recontacting
96 * same server, regardless of multiple */
97 public final static String ATTR_MIN_DELAY = "min-delay-ms";
98
99
100 private final static Integer DEFAULT_MIN_DELAY = new Integer(2000);
101
102 /*** Never wait more than this long, regardless of multiple */
103 public final static String ATTR_MAX_DELAY = "max-delay-ms";
104
105
106 private final static Integer DEFAULT_MAX_DELAY = new Integer(30000);
107
108 /*** Maximum times to emit a CrawlURI without final disposition */
109 public final static String ATTR_MAX_RETRIES = "max-retries";
110 private final static Integer DEFAULT_MAX_RETRIES = new Integer(30);
111
112 /*** For retryable problems, seconds to wait before a retry */
113 public final static String ATTR_RETRY_DELAY = "retry-delay-seconds";
114
115
116 private final static Long DEFAULT_RETRY_DELAY = new Long(900);
117
118 /*** Maximum simultaneous requests in process to a host (queue) */
119 public final static String ATTR_HOST_VALENCE = "host-valence";
120 private final static Integer DEFAULT_HOST_VALENCE = new Integer(1);
121
122 /*** Number of hops of embeds (ERX) to bump to front of host queue */
123 public final static String ATTR_PREFERENCE_EMBED_HOPS =
124 "preference-embed-hops";
125 private final static Integer DEFAULT_PREFERENCE_EMBED_HOPS = new Integer(0);
126
127 /*** Queue assignment to force on CrawlURIs. Intended to be used
128 * via overrides*/
129 public final static String ATTR_FORCE_QUEUE = "force-queue-assignment";
130 protected final static String DEFAULT_FORCE_QUEUE = "";
131 /*** Acceptable characters in forced queue names.
132 * Word chars, dash, period, comma, colon */
133 protected final static String ACCEPTABLE_FORCE_QUEUE = "[-//w//.,:]*";
134
135 /*** Should the queue assignment ignore www in hostnames, effectively
136 * stripping them away.
137 */
138 public final static String ATTR_QUEUE_IGNORE_WWW = "queue-ignore-www";
139 protected final static Boolean DEFAULT_QUEUE_IGNORE_WWW = new Boolean(false);
140
141 /*** Should the Frontier use a seperate 'already included' datastructure
142 * or rely on the queues'.
143 */
144 public final static String ATTR_USE_URI_UNIQ_FILTER = "use-uri-uniq-filter";
145 protected final static Boolean DEFAULT_USE_URI_UNIQ_FILTER = new Boolean(false);
146
147 /*** The Class to use for QueueAssignmentPolicy
148 */
149 public final static String ATTR_QUEUE_ASSIGNMENT_POLICY = "queue-assignment-policy";
150 protected final static String DEFAULT_QUEUE_ASSIGNMENT_POLICY = HostnameQueueAssignmentPolicy.class.getName();
151
152 private CrawlController controller;
153
154 private AdaptiveRevisitQueueList hostQueues;
155
156 private UriUniqFilter alreadyIncluded;
157
158 private ThreadLocalQueue threadWaiting = new ThreadLocalQueue();
159
160 /*** Policy for assigning CrawlURIs to named queues */
161 private QueueAssignmentPolicy queueAssignmentPolicy = null;
162
163
164 private long succeededFetchCount = 0;
165 private long failedFetchCount = 0;
166
167 private long disregardedUriCount = 0;
168
169 private long totalProcessedBytes = 0;
170
171
172 private boolean shouldPause = false;
173 private boolean shouldTerminate = false;
174
175
176 public AdaptiveRevisitFrontier(String name) {
177 this(name, "AdaptiveRevisitFrontier. EXPERIMENTAL Frontier that " +
178 "will repeatedly visit all " +
179 "encountered URIs. Wait time between visits is configurable" +
180 " and is determined by seperate Processor(s). See " +
181 "WaitEvaluators " +
182 "See documentation for ARFrontier limitations.");
183 }
184
185 public AdaptiveRevisitFrontier(String name, String description) {
186 super(Frontier.ATTR_NAME, description);
187 addElementToDefinition(new SimpleType(ATTR_DELAY_FACTOR,
188 "How many multiples of last fetch elapsed time to wait before " +
189 "recontacting same server", DEFAULT_DELAY_FACTOR));
190 addElementToDefinition(new SimpleType(ATTR_MAX_DELAY,
191 "Never wait more than this long, regardless of multiple",
192 DEFAULT_MAX_DELAY));
193 addElementToDefinition(new SimpleType(ATTR_MIN_DELAY,
194 "Always wait this long after one completion before recontacting " +
195 "same server, regardless of multiple", DEFAULT_MIN_DELAY));
196 addElementToDefinition(new SimpleType(ATTR_MAX_RETRIES,
197 "How often to retry fetching a URI that failed to be retrieved.\n" +
198 "If zero, the crawler will get the robots.txt only.",
199 DEFAULT_MAX_RETRIES));
200 addElementToDefinition(new SimpleType(ATTR_RETRY_DELAY,
201 "How long to wait by default until we retry fetching a" +
202 " URI that failed to be retrieved (seconds). ",
203 DEFAULT_RETRY_DELAY));
204 addElementToDefinition(new SimpleType(ATTR_PREFERENCE_EMBED_HOPS,
205 "Number of embedded (or redirected) hops up to which " +
206 "a URI has higher priority scheduling. For example, if set " +
207 "to 1 (the default), items such as inline images (1-hop " +
208 "embedded resources) will be scheduled ahead of all regular " +
209 "links (or many-hop resources, like nested frames). If set to " +
210 "zero, no preferencing will occur, and embeds/redirects are " +
211 "scheduled the same as regular links.",
212 DEFAULT_PREFERENCE_EMBED_HOPS));
213 Type t;
214 t = addElementToDefinition(new SimpleType(ATTR_HOST_VALENCE,
215 "Maximum number of simultaneous requests to a single" +
216 " host.",
217 DEFAULT_HOST_VALENCE));
218 t.setExpertSetting(true);
219 t = addElementToDefinition(new SimpleType(ATTR_QUEUE_IGNORE_WWW,
220 "If true then documents from x.com, www.x.com and any " +
221 "www[0-9]+.x.com will be assigned to the same queue.",
222 DEFAULT_QUEUE_IGNORE_WWW));
223 t.setExpertSetting(true);
224 t = addElementToDefinition(new SimpleType(
225 ATTR_FORCE_QUEUE,
226 "The queue name into which to force URIs. Should "
227 + "be left blank at global level. Specify a "
228 + "per-domain/per-host override to force URIs into "
229 + "a particular named queue, regardless of the assignment "
230 + "policy in effect (domain or ip-based politeness). "
231 + "This could be used on domains known to all be from "
232 + "the same small set of IPs (eg blogspot, dailykos, etc.) "
233 + "to simulate IP-based politeness, or it could be used if "
234 + "you wanted to enforce politeness over a whole domain, even "
235 + "though the subdomains are split across many IPs.",
236 DEFAULT_FORCE_QUEUE));
237 t.setOverrideable(true);
238 t.setExpertSetting(true);
239 t.addConstraint(new RegularExpressionConstraint(ACCEPTABLE_FORCE_QUEUE,
240 Level.WARNING, "This field must contain only alphanumeric "
241 + "characters plus period, dash, comma, colon, or underscore."));
242 t = addElementToDefinition(new SimpleType(ATTR_USE_URI_UNIQ_FILTER,
243 "If true then the Frontier will use a seperate " +
244 "datastructure to detect and eliminate duplicates.\n" +
245 "This is required for Canonicalization rules to work.",
246 DEFAULT_USE_URI_UNIQ_FILTER));
247 t.setExpertSetting(true);
248 t.setOverrideable(false);
249
250
251 String queueStr = System.getProperty(AbstractFrontier.class.getName() +
252 "." + ATTR_QUEUE_ASSIGNMENT_POLICY,
253 HostnameQueueAssignmentPolicy.class.getName() + " " +
254 IPQueueAssignmentPolicy.class.getName() + " " +
255 BucketQueueAssignmentPolicy.class.getName() + " " +
256 SurtAuthorityQueueAssignmentPolicy.class.getName() + " " +
257 TopmostAssignedSurtQueueAssignmentPolicy.class.getName());
258 Pattern p = Pattern.compile("//s*,//s*|//s+");
259 String [] queues = p.split(queueStr);
260 if (queues.length <= 0) {
261 throw new RuntimeException("Failed parse of " +
262 " assignment queue policy string: " + queueStr);
263 }
264 t = addElementToDefinition(new SimpleType(ATTR_QUEUE_ASSIGNMENT_POLICY,
265 "Defines how to assign URIs to queues. Can assign by host, " +
266 "by ip, and into one of a fixed set of buckets (1k). NOTE: " +
267 "Use of policies other than the default " +
268 "HostnameQueueAssignmentPolicy is untested and provided " +
269 "for use at your own risk. Further, changing this policy " +
270 "during a crawl, or between restarts using the same data " +
271 "directory, is likely to cause unrecoverable problems.",
272 DEFAULT_QUEUE_ASSIGNMENT_POLICY, queues));
273 t.setExpertSetting(true);
274
275
276 CrawlURI.addAlistPersistentMember(A_CONTENT_STATE_KEY);
277 CrawlURI.addAlistPersistentMember(A_TIME_OF_NEXT_PROCESSING);
278 }
279
280 public synchronized void initialize(CrawlController c)
281 throws FatalConfigurationException, IOException {
282 controller = c;
283 controller.addCrawlStatusListener(this);
284
285 String clsName = (String) getUncheckedAttribute(null,ATTR_QUEUE_ASSIGNMENT_POLICY);
286 try {
287 queueAssignmentPolicy = (QueueAssignmentPolicy) Class.forName(clsName).newInstance();
288 } catch (Exception e) {
289 throw new RuntimeException(e);
290 }
291
292 hostQueues = new AdaptiveRevisitQueueList(c.getBdbEnvironment(),
293 c.getBdbEnvironment().getClassCatalog());
294
295 if(((Boolean)getUncheckedAttribute(
296 null,ATTR_USE_URI_UNIQ_FILTER)).booleanValue()){
297 alreadyIncluded = createAlreadyIncluded();
298 } else {
299 alreadyIncluded = null;
300 }
301
302 loadSeeds();
303 }
304
305 /***
306 * Create a UriUniqFilter that will serve as record
307 * of already seen URIs.
308 *
309 * @return A UURISet that will serve as a record of already seen URIs
310 * @throws IOException
311 */
312 protected UriUniqFilter createAlreadyIncluded() throws IOException {
313 UriUniqFilter uuf = new BdbUriUniqFilter(
314 this.controller.getBdbEnvironment());
315 uuf.setDestination(this);
316 return uuf;
317 }
318
319 /***
320 * Loads the seeds
321 * <p>
322 * This method is called by initialize() and kickUpdate()
323 */
324 public void loadSeeds() {
325 Writer ignoredWriter = new StringWriter();
326
327 Iterator iter = this.controller.getScope().seedsIterator(ignoredWriter);
328 while (iter.hasNext()) {
329 CandidateURI caUri =
330 CandidateURI.createSeedCandidateURI((UURI)iter.next());
331 caUri.setSchedulingDirective(CandidateURI.MEDIUM);
332 schedule(caUri);
333 }
334 batchFlush();
335
336 AbstractFrontier.saveIgnoredItems(
337 ignoredWriter.toString(),
338 controller.getDisk());
339 }
340
341 public String getClassKey(CandidateURI cauri) {
342 String queueKey = (String)getUncheckedAttribute(cauri,
343 ATTR_FORCE_QUEUE);
344 if ("".equals(queueKey)) {
345
346 queueKey =
347 queueAssignmentPolicy.getClassKey(controller, cauri);
348
349
350
351 if(((Boolean)getUncheckedAttribute(
352 cauri,ATTR_QUEUE_IGNORE_WWW)).booleanValue()){
353 queueKey = queueKey.replaceAll("^www[0-9]{0,}//.","");
354 }
355 }
356 return queueKey;
357 }
358
359 /***
360 * Canonicalize passed uuri. Its would be sweeter if this canonicalize
361 * function was encapsulated by that which it canonicalizes but because
362 * settings change with context -- i.e. there may be overrides in operation
363 * for a particular URI -- its not so easy; Each CandidateURI would need a
364 * reference to the settings system. That's awkward to pass in.
365 *
366 * @param uuri Candidate URI to canonicalize.
367 * @return Canonicalized version of passed <code>uuri</code>.
368 */
369 protected String canonicalize(UURI uuri) {
370 return Canonicalizer.canonicalize(uuri, this.controller.getOrder());
371 }
372
373 /***
374 * Canonicalize passed CandidateURI. This method differs from
375 * {@link #canonicalize(UURI)} in that it takes a look at
376 * the CandidateURI context possibly overriding any canonicalization effect if
377 * it could make us miss content. If canonicalization produces an URL that
378 * was 'alreadyseen', but the entry in the 'alreadyseen' database did
379 * nothing but redirect to the current URL, we won't get the current URL;
380 * we'll think we've already see it. Examples would be archive.org
381 * redirecting to www.archive.org or the inverse, www.netarkivet.net
382 * redirecting to netarkivet.net (assuming stripWWW rule enabled).
383 * <p>Note, this method under circumstance sets the forceFetch flag.
384 *
385 * @param cauri CandidateURI to examine.
386 * @return Canonicalized <code>cacuri</code>.
387 */
388 protected String canonicalize(CandidateURI cauri) {
389 String canon = canonicalize(cauri.getUURI());
390 if (cauri.isLocation()) {
391
392
393
394
395
396
397
398
399
400
401 if (!cauri.toString().equals(cauri.getVia().toString()) &&
402 canonicalize(cauri.getVia()).equals(canon)) {
403 cauri.setForceFetch(true);
404 }
405 }
406 return canon;
407 }
408
409 /***
410 *
411 * @param caUri The URI to schedule.
412 */
413 protected void innerSchedule(CandidateURI caUri) {
414 CrawlURI curi;
415 if(caUri instanceof CrawlURI) {
416 curi = (CrawlURI) caUri;
417 } else {
418 curi = CrawlURI.from(caUri,System.currentTimeMillis());
419 curi.putLong(A_TIME_OF_NEXT_PROCESSING,
420 System.currentTimeMillis());
421
422 }
423
424 if(curi.getClassKey() == null){
425 curi.setClassKey(getClassKey(curi));
426 }
427
428 if(curi.isSeed() && curi.getVia() != null
429 && curi.flattenVia().length() > 0) {
430
431
432
433
434
435
436 this.controller.getScope().addSeed(curi);
437
438 curi.setSchedulingDirective(CandidateURI.MEDIUM);
439 }
440
441
442 int prefHops = ((Integer) getUncheckedAttribute(curi,
443 ATTR_PREFERENCE_EMBED_HOPS)).intValue();
444 boolean prefEmbed = false;
445 if (prefHops > 0) {
446 int embedHops = curi.getTransHops();
447 if (embedHops > 0 && embedHops <= prefHops
448 && curi.getSchedulingDirective() == CandidateURI.NORMAL) {
449
450
451 curi.setSchedulingDirective(CandidateURI.MEDIUM);
452 prefEmbed = true;
453 }
454 }
455
456
457
458 curi.putLong(A_TIME_OF_NEXT_PROCESSING,
459 System.currentTimeMillis());
460
461 try {
462 logger.finest("scheduling " + curi.toString());
463 AdaptiveRevisitHostQueue hq = getHQ(curi);
464 hq.add(curi,prefEmbed);
465 } catch (IOException e) {
466
467 e.printStackTrace();
468 }
469
470 }
471
472 /***
473 * Get the AdaptiveRevisitHostQueue for the given CrawlURI, creating
474 * it if necessary.
475 *
476 * @param curi CrawlURI for which to get a queue
477 * @return AdaptiveRevisitHostQueue for given CrawlURI
478 * @throws IOException
479 */
480 protected AdaptiveRevisitHostQueue getHQ(CrawlURI curi) throws IOException {
481 AdaptiveRevisitHostQueue hq = hostQueues.getHQ(curi.getClassKey());
482 if(hq == null){
483
484 int valence = DEFAULT_HOST_VALENCE.intValue();
485 try {
486 valence = ((Integer)getAttribute(curi,ATTR_HOST_VALENCE)).intValue();
487 } catch (AttributeNotFoundException e2) {
488 logger.severe("Unable to load valence.");
489 }
490 hq = hostQueues.createHQ(curi.getClassKey(),valence);
491 }
492 return hq;
493 }
494
495 protected void batchSchedule(CandidateURI caUri) {
496 threadWaiting.getQueue().enqueue(caUri);
497 }
498
499 synchronized protected void batchFlush() {
500 innerBatchFlush();
501 }
502
503 private void innerBatchFlush() {
504 Queue q = threadWaiting.getQueue();
505 while(!q.isEmpty()) {
506 CandidateURI caUri = (CandidateURI)q.dequeue();
507 if(alreadyIncluded != null){
508 String cannon = canonicalize(caUri);
509 System.out.println("Cannon of " + caUri + " is " + cannon);
510 if (caUri.forceFetch()) {
511 alreadyIncluded.addForce(cannon, caUri);
512 } else {
513 alreadyIncluded.add(cannon, caUri);
514 }
515 } else {
516 innerSchedule(caUri);
517 }
518 }
519 }
520
521 /***
522 * @param curi
523 * @return the CrawlServer to be associated with this CrawlURI
524 */
525 protected CrawlServer getServer(CrawlURI curi) {
526 return this.controller.getServerCache().getServerFor(curi);
527 }
528
529
530
531
532 public synchronized CrawlURI next()
533 throws InterruptedException, EndedException {
534 controller.checkFinish();
535
536 while(shouldPause){
537 controller.toePaused();
538 wait();
539 }
540
541 if(shouldTerminate){
542 throw new EndedException("terminated");
543 }
544
545 AdaptiveRevisitHostQueue hq = hostQueues.getTopHQ();
546
547 while(hq.getState() != AdaptiveRevisitHostQueue.HQSTATE_READY){
548
549
550 long waitTime = hq.getNextReadyTime() - System.currentTimeMillis();
551 if(waitTime > 0){
552 wait(waitTime);
553 }
554
555 hq = hostQueues.getTopHQ();
556 }
557
558 if(shouldTerminate){
559
560 throw new EndedException("terminated");
561 }
562
563 try {
564 CrawlURI curi = hq.next();
565
566 logger.fine("Issuing " + curi.toString());
567 long temp = curi.getLong(A_TIME_OF_NEXT_PROCESSING);
568 long currT = System.currentTimeMillis();
569 long overdue = (currT-temp);
570 if(logger.isLoggable(Level.FINER)){
571 String waitI = "not set";
572 if(curi.containsKey(A_WAIT_INTERVAL)){
573 waitI = ArchiveUtils.formatMillisecondsToConventional(
574 curi.getLong(A_WAIT_INTERVAL));
575 }
576 logger.finer("Wait interval: " + waitI +
577 ", Time of next proc: " + temp +
578 ", Current time: " + currT +
579 ", Overdue by: " + overdue + "ms");
580 }
581 if(overdue < 0){
582
583 logger.severe("Time overdue for " + curi.toString() +
584 "is negative (" + overdue + ")!");
585 }
586 curi.putLong(A_FETCH_OVERDUE,overdue);
587 return curi;
588 } catch (IOException e) {
589
590
591 e.printStackTrace();
592 }
593
594 return null;
595 }
596
597
598
599
600 public boolean isEmpty() {
601
602
603 return hostQueues.getSize() == 0;
604 }
605
606
607
608
609 public void schedule(CandidateURI caURI) {
610 batchSchedule(caURI);
611 if(!(Thread.currentThread() instanceof ToeThread)) {
612
613 batchFlush();
614 }
615 }
616
617
618
619
620 public synchronized void finished(CrawlURI curi) {
621 logger.fine(curi.toString()+ " " +
622 CrawlURI.fetchStatusCodesToString(curi.getFetchStatus()));
623 curi.incrementFetchAttempts();
624 logLocalizedErrors(curi);
625
626 innerFinished(curi);
627 }
628
629 protected synchronized void innerFinished(CrawlURI curi) {
630 try {
631 innerBatchFlush();
632
633 if (curi.isSuccess()) {
634 successDisposition(curi);
635 } else if (needsPromptRetry(curi)) {
636
637
638 reschedule(curi,false);
639 } else if (needsRetrying(curi)) {
640
641 reschedule(curi,true);
642 controller.fireCrawledURINeedRetryEvent(curi);
643 } else if(isDisregarded(curi)) {
644
645
646 disregardDisposition(curi);
647 } else {
648
649 failureDisposition(curi);
650 }
651
652
653
654
655
656 notifyAll();
657 } catch (RuntimeException e) {
658 curi.setFetchStatus(S_RUNTIME_EXCEPTION);
659
660 logger.warning("RTE in innerFinished() " +
661 e.getMessage());
662 e.printStackTrace();
663 curi.putObject(A_RUNTIME_EXCEPTION, e);
664 failureDisposition(curi);
665 } catch (AttributeNotFoundException e) {
666 logger.severe(e.getMessage());
667 }
668 }
669
670 /***
671 * Take note of any processor-local errors that have
672 * been entered into the CrawlURI.
673 * @param curi CrawlURI with errors.
674 */
675 private void logLocalizedErrors(CrawlURI curi) {
676 if(curi.containsKey(A_LOCALIZED_ERRORS)) {
677 List localErrors = (List)curi.getObject(A_LOCALIZED_ERRORS);
678 Iterator iter = localErrors.iterator();
679 while(iter.hasNext()) {
680 Object array[] = {curi, iter.next()};
681 controller.localErrors.log(Level.WARNING,
682 curi.getUURI().toString(), array);
683 }
684
685 curi.remove(A_LOCALIZED_ERRORS);
686 }
687 }
688
689 /***
690 * The CrawlURI has been successfully crawled.
691 *
692 * @param curi The CrawlURI
693 */
694 protected void successDisposition(CrawlURI curi) {
695 curi.aboutToLog();
696
697 long waitInterval = 0;
698
699 if(curi.containsKey(A_WAIT_INTERVAL)){
700 waitInterval = curi.getLong(A_WAIT_INTERVAL);
701 curi.addAnnotation("wt:" +
702 ArchiveUtils.formatMillisecondsToConventional(
703 waitInterval));
704 } else {
705 logger.severe("Missing wait interval for " + curi.toString() +
706 " WaitEvaluator may be missing.");
707 }
708 if(curi.containsKey(A_NUMBER_OF_VISITS)){
709 curi.addAnnotation(curi.getInt(A_NUMBER_OF_VISITS) + "vis");
710 }
711 if(curi.containsKey(A_NUMBER_OF_VERSIONS)){
712 curi.addAnnotation(curi.getInt(A_NUMBER_OF_VERSIONS) + "ver");
713 }
714 if(curi.containsKey(A_FETCH_OVERDUE)){
715 curi.addAnnotation("ov:" +
716 ArchiveUtils.formatMillisecondsToConventional(
717 (curi.getLong(A_FETCH_OVERDUE))));
718 }
719
720 Object array[] = { curi };
721 controller.uriProcessing.log(
722 Level.INFO,
723 curi.getUURI().toString(),
724 array);
725
726 succeededFetchCount++;
727 totalProcessedBytes += curi.getContentSize();
728
729
730
731 controller.fireCrawledURISuccessfulEvent(curi);
732
733 curi.setSchedulingDirective(CandidateURI.NORMAL);
734
735
736 curi.putLong(A_TIME_OF_NEXT_PROCESSING,
737 System.currentTimeMillis()+waitInterval);
738
739
740
741 AdaptiveRevisitHostQueue hq = hostQueues.getHQ(curi.getClassKey());
742
743
744
745
746 long wakeupTime = (curi.containsKey(A_FETCH_COMPLETED_TIME)?
747 curi.getLong(A_FETCH_COMPLETED_TIME):
748 (new Date()).getTime()) + calculateSnoozeTime(curi);
749
750
751 curi.processingCleanup();
752 curi.resetDeferrals();
753 curi.resetFetchAttempts();
754
755 try {
756 hq.update(curi, true, wakeupTime, shouldBeForgotten(curi));
757 } catch (IOException e) {
758 logger.severe("An IOException occured when updating " +
759 curi.toString() + "\n" + e.getMessage());
760 e.printStackTrace();
761 }
762 }
763
764 /***
765 * Put near top of relevant hostQueue (but behind anything recently
766 * scheduled 'high')..
767 *
768 * @param curi CrawlURI to reschedule. Its time of next processing is not
769 * modified.
770 * @param errorWait signals if there should be a wait before retrying.
771 * @throws AttributeNotFoundException
772 */
773 protected void reschedule(CrawlURI curi, boolean errorWait)
774 throws AttributeNotFoundException {
775 long delay = 0;
776 if(errorWait){
777 if(curi.containsKey(A_RETRY_DELAY)) {
778 delay = curi.getLong(A_RETRY_DELAY);
779 } else {
780
781 delay = ((Long)getAttribute(ATTR_RETRY_DELAY,curi)).longValue();
782 }
783 }
784
785 long retryTime = (curi.containsKey(A_FETCH_COMPLETED_TIME)?
786 curi.getLong(A_FETCH_COMPLETED_TIME):
787 (new Date()).getTime()) + delay;
788
789 AdaptiveRevisitHostQueue hq = hostQueues.getHQ(curi.getClassKey());
790
791 curi.processingCleanup();
792 if(errorWait){
793 curi.resetDeferrals();
794 }
795 try {
796 hq.update(curi, errorWait, retryTime, shouldBeForgotten(curi));
797 } catch (IOException e) {
798
799 e.printStackTrace();
800 }
801 }
802
803 /***
804 * The CrawlURI has encountered a problem, and will not
805 * be retried.
806 *
807 * @param curi The CrawlURI
808 */
809 protected void failureDisposition(CrawlURI curi) {
810
811 this.controller.fireCrawledURIFailureEvent(curi);
812
813
814 curi.aboutToLog();
815 Object array[] = { curi };
816 this.controller.uriProcessing.log(
817 Level.INFO,
818 curi.getUURI().toString(),
819 array);
820
821
822 if (curi.getFetchStatus() == S_RUNTIME_EXCEPTION) {
823 this.controller.runtimeErrors.log(
824 Level.WARNING,
825 curi.getUURI().toString(),
826 array);
827 }
828 failedFetchCount++;
829
830
831 curi.setSchedulingDirective(CandidateURI.NORMAL);
832
833 curi.putLong(A_TIME_OF_NEXT_PROCESSING,Long.MAX_VALUE);
834
835 AdaptiveRevisitHostQueue hq = hostQueues.getHQ(curi.getClassKey());
836
837 curi.processingCleanup();
838 curi.resetDeferrals();
839 curi.resetFetchAttempts();
840 try {
841
842 boolean shouldForget = shouldBeForgotten(curi);
843 if(shouldForget && alreadyIncluded != null){
844 alreadyIncluded.forget(canonicalize(curi.getUURI()),curi);
845 }
846
847 hq.update(curi,false, 0, shouldForget);
848 } catch (IOException e) {
849
850 e.printStackTrace();
851 }
852 }
853
854 protected void disregardDisposition(CrawlURI curi) {
855
856 controller.fireCrawledURIDisregardEvent(curi);
857
858
859 curi.aboutToLog();
860 Object array[] = { curi };
861 controller.uriProcessing.log(
862 Level.INFO,
863 curi.getUURI().toString(),
864 array);
865
866 disregardedUriCount++;
867
868
869
870 curi.putLong(A_TIME_OF_NEXT_PROCESSING,Long.MAX_VALUE);
871 curi.setSchedulingDirective(CandidateURI.NORMAL);
872
873 AdaptiveRevisitHostQueue hq = hostQueues.getHQ(curi.getClassKey());
874
875 curi.processingCleanup();
876 curi.resetDeferrals();
877 curi.resetFetchAttempts();
878 try {
879
880 hq.update(curi, false, 0, shouldBeForgotten(curi));
881 } catch (IOException e) {
882
883 e.printStackTrace();
884 }
885 }
886
887 /***
888 * Some URIs, if they recur, deserve another
889 * chance at consideration: they might not be too
890 * many hops away via another path, or the scope
891 * may have been updated to allow them passage.
892 *
893 * @param curi
894 * @return True if curi should be forgotten.
895 */
896 protected boolean shouldBeForgotten(CrawlURI curi) {
897 boolean shouldForget = false;
898
899 switch(curi.getFetchStatus()) {
900 case S_OUT_OF_SCOPE:
901 case S_TOO_MANY_EMBED_HOPS:
902 case S_TOO_MANY_LINK_HOPS:
903 shouldForget = true;
904 default:
905 shouldForget = false;
906 }
907
908 if (!shouldForget) {
909 if (curi.containsKey(A_DISCARD_REVISIT)) {
910 Boolean noRevisit = (Boolean) curi.getObject(A_DISCARD_REVISIT);
911 if (noRevisit) {
912 if (logger.isLoggable(Level.FINE)) {
913 logger.fine("NO_REVISIT tag set for URI: "
914 + curi.getUURI().toString());
915 }
916 shouldForget = true;
917 }
918 }
919 }
920
921 return shouldForget;
922 }
923
924 /***
925 * Checks if a recently completed CrawlURI that did not finish successfully
926 * needs to be retried immediately (processed again as soon as politeness
927 * allows.)
928 *
929 * @param curi The CrawlURI to check
930 * @return True if we need to retry promptly.
931 * @throws AttributeNotFoundException If problems occur trying to read the
932 * maximum number of retries from the settings framework.
933 */
934 protected boolean needsPromptRetry(CrawlURI curi)
935 throws AttributeNotFoundException {
936 if (curi.getFetchAttempts() >=
937 ((Integer)getAttribute(ATTR_MAX_RETRIES, curi)).intValue() ) {
938 return false;
939 }
940
941 switch (curi.getFetchStatus()) {
942 case S_DEFERRED:
943 return true;
944
945 case HttpStatus.SC_UNAUTHORIZED:
946
947
948
949
950
951
952 boolean loaded = curi.hasRfc2617CredentialAvatar();
953 if (!loaded) {
954 logger.severe("Have 401 but no creds loaded " + curi);
955 }
956 return loaded;
957
958 default:
959 return false;
960 }
961 }
962
963 /***
964 * Checks if a recently completed CrawlURI that did not finish successfully
965 * needs to be retried (processed again after some time elapses)
966 *
967 * @param curi The CrawlURI to check
968 * @return True if we need to retry.
969 * @throws AttributeNotFoundException If problems occur trying to read the
970 * maximum number of retries from the settings framework.
971 */
972 protected boolean needsRetrying(CrawlURI curi)
973 throws AttributeNotFoundException {
974
975 if (curi.getFetchAttempts() >=
976 ((Integer)getAttribute(ATTR_MAX_RETRIES,curi)).intValue() ) {
977 return false;
978 } else {
979
980 switch (curi.getFetchStatus()) {
981 case S_CONNECT_FAILED:
982 case S_CONNECT_LOST:
983 case S_DOMAIN_UNRESOLVABLE:
984
985
986
987 return true;
988 default:
989 return false;
990 }
991 }
992 }
993
994 protected boolean isDisregarded(CrawlURI curi) {
995 switch (curi.getFetchStatus()) {
996 case S_ROBOTS_PRECLUDED :
997 case S_OUT_OF_SCOPE :
998 case S_BLOCKED_BY_CUSTOM_PROCESSOR:
999 case S_BLOCKED_BY_USER :
1000 case S_TOO_MANY_EMBED_HOPS :
1001 case S_TOO_MANY_LINK_HOPS :
1002 case S_DELETED_BY_USER :
1003 return true;
1004 default:
1005 return false;
1006 }
1007 }
1008
1009 /***
1010 * Calculates how long a host queue needs to be snoozed following the
1011 * crawling of a URI.
1012 *
1013 * @param curi The CrawlURI
1014 * @return How long to snooze.
1015 */
1016 protected long calculateSnoozeTime(CrawlURI curi) {
1017 long durationToWait = 0;
1018 if (curi.containsKey(A_FETCH_BEGAN_TIME)
1019 && curi.containsKey(A_FETCH_COMPLETED_TIME)) {
1020
1021 try{
1022
1023 long completeTime = curi.getLong(A_FETCH_COMPLETED_TIME);
1024 long durationTaken =
1025 (completeTime - curi.getLong(A_FETCH_BEGAN_TIME));
1026
1027 durationToWait = (long)(
1028 ((Float) getAttribute(ATTR_DELAY_FACTOR, curi))
1029 .floatValue() * durationTaken);
1030
1031 long minDelay =
1032 ((Integer) getAttribute(ATTR_MIN_DELAY, curi)).longValue();
1033
1034 if (minDelay > durationToWait) {
1035
1036 durationToWait = minDelay;
1037 }
1038
1039 long maxDelay = ((Integer) getAttribute(ATTR_MAX_DELAY, curi)).longValue();
1040 if (durationToWait > maxDelay) {
1041
1042 durationToWait = maxDelay;
1043 }
1044 } catch (AttributeNotFoundException e) {
1045 logger.severe("Unable to find attribute. " +
1046 curi.toString());
1047
1048 durationToWait = DEFAULT_MAX_DELAY.longValue();
1049 }
1050
1051 }
1052 long ret = durationToWait > DEFAULT_MIN_DELAY.longValue() ?
1053 durationToWait : DEFAULT_MIN_DELAY.longValue();
1054 logger.finest("Snooze time for " + curi.toString() + " = " + ret );
1055 return ret;
1056 }
1057
1058
1059
1060
1061 public synchronized long discoveredUriCount() {
1062 return (this.alreadyIncluded != null) ?
1063 this.alreadyIncluded.count() : hostQueues.getSize();
1064 }
1065
1066
1067
1068
1069 public synchronized long queuedUriCount() {
1070 return hostQueues.getSize();
1071 }
1072
1073
1074
1075
1076 public long finishedUriCount() {
1077 return succeededFetchCount+failedFetchCount+disregardedUriCount;
1078 }
1079
1080
1081
1082
1083 public long succeededFetchCount() {
1084 return succeededFetchCount;
1085 }
1086
1087
1088
1089
1090 public long failedFetchCount() {
1091 return failedFetchCount;
1092 }
1093
1094
1095
1096
1097 public long disregardedUriCount() {
1098 return disregardedUriCount++;
1099 }
1100
1101
1102
1103
1104 public long totalBytesWritten() {
1105 return totalProcessedBytes;
1106 }
1107
1108 /***
1109 * Method is not supported by this Frontier implementation..
1110 * @param pathToLog
1111 * @throws IOException
1112 */
1113 public void importRecoverLog(String pathToLog) throws IOException {
1114 throw new IOException("Unsupported by this frontier.");
1115 }
1116
1117 public synchronized FrontierMarker getInitialMarker(String regexpr,
1118 boolean inCacheOnly) {
1119 return null;
1120 }
1121
1122
1123
1124
1125 public synchronized ArrayList<String> getURIsList(FrontierMarker marker,
1126 int numberOfMatches, boolean verbose)
1127 throws InvalidFrontierMarkerException {
1128
1129 return null;
1130 }
1131
1132
1133
1134
1135 public synchronized long deleteURIs(String match) {
1136
1137 return 0;
1138 }
1139
1140
1141
1142
1143 public synchronized long deleteURIs(String uriMatch, String queueMatch) {
1144
1145 return 0;
1146 }
1147
1148
1149
1150
1151 public synchronized void deleted(CrawlURI curi) {
1152
1153 }
1154
1155 public void considerIncluded(UURI u) {
1156
1157 CrawlURI curi = new CrawlURI(u);
1158 innerSchedule(curi);
1159
1160 }
1161
1162 public void kickUpdate() {
1163 loadSeeds();
1164 }
1165
1166 public void start() {
1167 unpause();
1168 }
1169
1170 synchronized public void pause() {
1171 shouldPause = true;
1172 notifyAll();
1173 }
1174 synchronized public void unpause() {
1175 shouldPause = false;
1176 notifyAll();
1177 }
1178 synchronized public void terminate() {
1179 shouldTerminate = true;
1180 }
1181
1182
1183
1184
1185 public FrontierJournal getFrontierJournal() {
1186 return null;
1187 }
1188
1189 private static class ThreadLocalQueue
1190 extends ThreadLocal<Queue<CandidateURI>> implements Serializable {
1191
1192 private static final long serialVersionUID = 8268977225156462059L;
1193
1194 protected Queue<CandidateURI> initialValue() {
1195 return new MemQueue<CandidateURI>();
1196 }
1197
1198 /***
1199 * @return Queue of 'batched' items
1200 */
1201 public Queue<CandidateURI> getQueue() {
1202 return get();
1203 }
1204 }
1205
1206 /***
1207 * This method is not supported by this Frontier implementation
1208 * @param pathToLog
1209 * @param retainFailures
1210 * @throws IOException
1211 */
1212 public void importRecoverLog(String pathToLog, boolean retainFailures)
1213 throws IOException {
1214 throw new IOException("Unsupported");
1215 }
1216
1217
1218
1219
1220
1221 public String[] getReports() {
1222
1223 return new String[] {};
1224 }
1225
1226
1227
1228
1229 public String singleLineReport() {
1230 return ArchiveUtils.singleLineReport(this);
1231 }
1232
1233
1234
1235
1236 public void reportTo(PrintWriter writer) throws IOException {
1237 reportTo(null,writer);
1238 }
1239
1240
1241
1242
1243 public synchronized void singleLineReportTo(PrintWriter w) throws IOException {
1244 hostQueues.singleLineReportTo(w);
1245 }
1246
1247
1248
1249
1250 public String singleLineLegend() {
1251 return hostQueues.singleLineLegend();
1252 }
1253
1254
1255
1256
1257 public synchronized void reportTo(String name, PrintWriter writer) {
1258
1259 hostQueues.reportTo(name, writer);
1260 }
1261
1262
1263
1264
1265 @Override
1266 public void finalTasks() {
1267
1268 }
1269
1270
1271
1272
1273 public void crawlStarted(String message) {
1274
1275 }
1276
1277
1278
1279
1280 public void crawlEnding(String sExitMessage) {
1281
1282 }
1283
1284
1285
1286
1287 public void crawlEnded(String sExitMessage) {
1288
1289 if (this.alreadyIncluded != null) {
1290 this.alreadyIncluded.close();
1291 this.alreadyIncluded = null;
1292 }
1293 hostQueues.close();
1294 }
1295
1296
1297
1298
1299 public void crawlPausing(String statusMessage) {
1300
1301 }
1302
1303
1304
1305
1306 public void crawlPaused(String statusMessage) {
1307
1308 }
1309
1310
1311
1312
1313 public void crawlResuming(String statusMessage) {
1314
1315 }
1316
1317
1318
1319
1320 public void crawlCheckpoint(File checkpointDir) throws Exception {
1321
1322 }
1323
1324
1325
1326
1327 public void receive(CandidateURI item) {
1328 System.out.println("Received " + item);
1329 innerSchedule(item);
1330 }
1331
1332
1333
1334
1335 public FrontierGroup getGroup(CrawlURI curi) {
1336 try {
1337 return getHQ(curi);
1338 } catch (IOException ioe) {
1339 throw new RuntimeException(ioe);
1340 }
1341 }
1342
1343 public long averageDepth() {
1344 return hostQueues.getAverageDepth();
1345 }
1346
1347 public float congestionRatio() {
1348 return hostQueues.getCongestionRatio();
1349 }
1350
1351 public long deepestUri() {
1352 return hostQueues.getDeepestQueueSize();
1353 }
1354 }