1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.archive.crawler.frontier;
21
22 import java.io.IOException;
23 import java.io.PrintWriter;
24 import java.io.Serializable;
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.Collections;
28 import java.util.Iterator;
29 import java.util.Queue;
30 import java.util.SortedSet;
31 import java.util.Timer;
32 import java.util.TimerTask;
33 import java.util.TreeSet;
34 import java.util.concurrent.BlockingQueue;
35 import java.util.concurrent.LinkedBlockingQueue;
36 import java.util.concurrent.Semaphore;
37 import java.util.concurrent.TimeUnit;
38 import java.util.logging.Level;
39 import java.util.logging.Logger;
40
41 import org.apache.commons.collections.Bag;
42 import org.apache.commons.collections.BagUtils;
43 import org.apache.commons.collections.bag.HashBag;
44 import org.apache.commons.lang.StringUtils;
45 import org.archive.crawler.datamodel.CandidateURI;
46 import org.archive.crawler.datamodel.CoreAttributeConstants;
47 import org.archive.crawler.datamodel.CrawlURI;
48 import org.archive.crawler.datamodel.FetchStatusCodes;
49 import org.archive.crawler.datamodel.UriUniqFilter;
50 import org.archive.crawler.datamodel.UriUniqFilter.HasUriReceiver;
51 import org.archive.crawler.framework.CrawlController;
52 import org.archive.crawler.framework.Frontier;
53 import org.archive.crawler.framework.exceptions.EndedException;
54 import org.archive.crawler.framework.exceptions.FatalConfigurationException;
55 import org.archive.crawler.settings.SimpleType;
56 import org.archive.crawler.settings.Type;
57 import org.archive.net.UURI;
58 import org.archive.util.ArchiveUtils;
59 import org.archive.util.ObjectIdentityCache;
60 import org.archive.util.ObjectIdentityMemCache;
61
62 import com.sleepycat.collections.StoredIterator;
63
64 /***
65 * A common Frontier base using several queues to hold pending URIs.
66 *
67 * Uses in-memory map of all known 'queues' inside a single database.
68 * Round-robins between all queues.
69 *
70 * @author Gordon Mohr
71 * @author Christian Kohlschuetter
72 */
73 public abstract class WorkQueueFrontier extends AbstractFrontier
74 implements FetchStatusCodes, CoreAttributeConstants, HasUriReceiver,
75 Serializable {
76 private static final long serialVersionUID = 570384305871965843L;
77
78 public class WakeTask extends TimerTask {
79 @Override
80 public void run() {
81 synchronized(snoozedClassQueues) {
82 if(this!=nextWake) {
83
84 return;
85 }
86 wakeQueues();
87 }
88 }
89 }
90
91 /*** truncate reporting of queues at some large but not unbounded number */
92 private static final int REPORT_MAX_QUEUES = 2000;
93
94 /***
95 * If we know that only a small amount of queues is held in memory,
96 * we can avoid using a disk-based BigMap.
97 * This only works efficiently if the WorkQueue does not hold its
98 * entries in memory as well.
99 */
100 private static final int MAX_QUEUES_TO_HOLD_ALLQUEUES_IN_MEMORY = 3000;
101
102 /***
103 * When a snooze target for a queue is longer than this amount, and
104 * there are already ready queues, deactivate rather than snooze
105 * the current queue -- so other more responsive sites get a chance
106 * in active rotation. (As a result, queue's next try may be much
107 * further in the future than the snooze target delay.)
108 */
109 public final static String ATTR_SNOOZE_DEACTIVATE_MS =
110 "snooze-deactivate-ms";
111 public static Long DEFAULT_SNOOZE_DEACTIVATE_MS = new Long(5*60*1000);
112
113 private static final Logger logger =
114 Logger.getLogger(WorkQueueFrontier.class.getName());
115
116 /*** whether to hold queues INACTIVE until needed for throughput */
117 public final static String ATTR_HOLD_QUEUES = "hold-queues";
118 protected final static Boolean DEFAULT_HOLD_QUEUES = new Boolean(true);
119
120 /*** amount to replenish budget on each activation (duty cycle) */
121 public final static String ATTR_BALANCE_REPLENISH_AMOUNT =
122 "balance-replenish-amount";
123 protected final static Integer DEFAULT_BALANCE_REPLENISH_AMOUNT =
124 new Integer(3000);
125
126 /*** whether to hold queues INACTIVE until needed for throughput */
127 public final static String ATTR_ERROR_PENALTY_AMOUNT =
128 "error-penalty-amount";
129 protected final static Integer DEFAULT_ERROR_PENALTY_AMOUNT =
130 new Integer(100);
131
132
133 /*** total expenditure to allow a queue before 'retiring' it */
134 public final static String ATTR_QUEUE_TOTAL_BUDGET = "queue-total-budget";
135 protected final static Long DEFAULT_QUEUE_TOTAL_BUDGET = new Long(-1);
136
137 /*** cost assignment policy to use (by class name) */
138 public final static String ATTR_COST_POLICY = "cost-policy";
139 protected final static String DEFAULT_COST_POLICY =
140 UnitCostAssignmentPolicy.class.getName();
141
142 /*** target size of ready queues backlog */
143 public final static String ATTR_TARGET_READY_QUEUES_BACKLOG =
144 "target-ready-backlog";
145 protected final static Integer DEFAULT_TARGET_READY_QUEUES_BACKLOG =
146 new Integer(50);
147
148 /*** those UURIs which are already in-process (or processed), and
149 thus should not be rescheduled */
150 protected transient UriUniqFilter alreadyIncluded;
151
152 /*** All known queues.
153 */
154 protected transient ObjectIdentityCache<String,WorkQueue> allQueues = null;
155
156
157 /***
158 * All per-class queues whose first item may be handed out.
159 * Linked-list of keys for the queues.
160 */
161 protected BlockingQueue<String> readyClassQueues;
162
163 /*** Target (minimum) size to keep readyClassQueues */
164 protected int targetSizeForReadyQueues;
165
166 /*** single-thread access to ready-filling code */
167 protected transient Semaphore readyFiller = new Semaphore(1);
168
169 /***
170 * All 'inactive' queues, not yet in active rotation.
171 * Linked-list of keys for the queues.
172 */
173 protected Queue<String> inactiveQueues;
174
175 /***
176 * 'retired' queues, no longer considered for activation.
177 * Linked-list of keys for queues.
178 */
179 protected Queue<String> retiredQueues;
180
181 /*** all per-class queues from whom a URI is outstanding */
182 protected Bag inProcessQueues =
183 BagUtils.synchronizedBag(new HashBag());
184
185 /***
186 * All per-class queues held in snoozed state, sorted by wake time.
187 */
188 protected SortedSet<WorkQueue> snoozedClassQueues;
189
190 /*** Timer for tasks which wake head item of snoozedClassQueues */
191 protected transient Timer wakeTimer;
192
193 /*** Task for next wake */
194 protected transient WakeTask nextWake;
195
196 protected WorkQueue longestActiveQueue = null;
197
198 /*** how long to wait for a ready queue when there's nothing snoozed */
199 private static final long DEFAULT_WAIT = 1000;
200
201 /*** a policy for assigning 'cost' values to CrawlURIs */
202 private transient CostAssignmentPolicy costAssignmentPolicy;
203
204 /*** all policies available to be chosen */
205 String[] AVAILABLE_COST_POLICIES = new String[] {
206 ZeroCostAssignmentPolicy.class.getName(),
207 UnitCostAssignmentPolicy.class.getName(),
208 WagCostAssignmentPolicy.class.getName(),
209 AntiCalendarCostAssignmentPolicy.class.getName()};
210
211 /***
212 * Create the CommonFrontier
213 *
214 * @param name
215 * @param description
216 */
217 public WorkQueueFrontier(String name, String description) {
218
219
220 super(Frontier.ATTR_NAME, description);
221 Type t = addElementToDefinition(new SimpleType(ATTR_HOLD_QUEUES,
222 "Whether to hold newly-created per-host URI work " +
223 "queues until needed to stay busy. If true (the default), " +
224 "queues begin (and collect URIs) in an 'inactive' state, and " +
225 "only when the Frontier needs another queue to keep all " +
226 "ToeThreads busy will new queues be activated. If false, all " +
227 "queues contribute URIs for crawling at all times in a " +
228 "round-robin fashion. (This mode is less likely to make " +
229 "effective use of database and IO caches.)", DEFAULT_HOLD_QUEUES));
230 t.setExpertSetting(true);
231 t.setOverrideable(false);
232 t = addElementToDefinition(new SimpleType(ATTR_BALANCE_REPLENISH_AMOUNT,
233 "Amount to replenish a queue's activity balance when it becomes " +
234 "active. Larger amounts mean more URIs will be tried from the " +
235 "queue before it is deactivated in favor of waiting queues. " +
236 "Default is 3000", DEFAULT_BALANCE_REPLENISH_AMOUNT));
237 t.setExpertSetting(true);
238 t.setOverrideable(true);
239 t = addElementToDefinition(new SimpleType(ATTR_ERROR_PENALTY_AMOUNT,
240 "Amount to additionally penalize a queue when one of" +
241 "its URIs fails completely. Accelerates deactivation or " +
242 "full retirement of problem queues and unresponsive sites. " +
243 "Default is 100", DEFAULT_ERROR_PENALTY_AMOUNT));
244 t.setExpertSetting(true);
245 t.setOverrideable(true);
246 t = addElementToDefinition(new SimpleType(ATTR_QUEUE_TOTAL_BUDGET,
247 "Total activity expenditure allowable to a single queue; queues " +
248 "over this expenditure will be 'retired' and crawled no more. " +
249 "Default of -1 means no ceiling on activity expenditures is " +
250 "enforced.", DEFAULT_QUEUE_TOTAL_BUDGET));
251 t.setExpertSetting(true);
252 t.setOverrideable(true);
253
254 t = addElementToDefinition(new SimpleType(ATTR_COST_POLICY,
255 "Policy for calculating the cost of each URI attempted. " +
256 "The default UnitCostAssignmentPolicy considers the cost of " +
257 "each URI to be '1'.", DEFAULT_COST_POLICY, AVAILABLE_COST_POLICIES));
258 t.setExpertSetting(true);
259
260 t = addElementToDefinition(new SimpleType(ATTR_SNOOZE_DEACTIVATE_MS,
261 "Threshold above which any 'snooze' delay will cause the " +
262 "affected queue to go inactive, allowing other queues a " +
263 "chance to rotate into active state. Typically set to be " +
264 "longer than the politeness pauses between successful " +
265 "fetches, but shorter than the connection-failed " +
266 "'retry-delay-seconds'. (Default is 5 minutes.)",
267 DEFAULT_SNOOZE_DEACTIVATE_MS));
268 t.setExpertSetting(true);
269 t.setOverrideable(false);
270 t = addElementToDefinition(new SimpleType(ATTR_TARGET_READY_QUEUES_BACKLOG,
271 "Target size for backlog of ready queues. This many queues " +
272 "will be brought into 'ready' state even if a thread is " +
273 "not waiting. Only has effect if 'hold-queues' is true. " +
274 "Default is 50.", DEFAULT_TARGET_READY_QUEUES_BACKLOG));
275 t.setExpertSetting(true);
276 t.setOverrideable(false);
277 }
278
279 /***
280 * Initializes the Frontier, given the supplied CrawlController.
281 *
282 * @see org.archive.crawler.framework.Frontier#initialize(org.archive.crawler.framework.CrawlController)
283 */
284 public void initialize(CrawlController c)
285 throws FatalConfigurationException, IOException {
286
287 super.initialize(c);
288 this.controller = c;
289
290 initQueuesOfQueues();
291
292 this.targetSizeForReadyQueues = (Integer)getUncheckedAttribute(null,
293 ATTR_TARGET_READY_QUEUES_BACKLOG);
294 if (this.targetSizeForReadyQueues < 1) {
295 this.targetSizeForReadyQueues = 1;
296 }
297 this.wakeTimer = new Timer("waker for " + c.toString());
298
299 try {
300 if (workQueueDataOnDisk()
301 && getQueueAssignmentPolicy(null).maximumNumberOfKeys() >= 0
302 && getQueueAssignmentPolicy(null).maximumNumberOfKeys() <=
303 MAX_QUEUES_TO_HOLD_ALLQUEUES_IN_MEMORY) {
304 this.allQueues = new ObjectIdentityMemCache<WorkQueue>(701, .9f, 100);
305 } else {
306 this.allQueues = c.getBigMap("allqueues", WorkQueue.class);
307 if (logger.isLoggable(Level.FINE)) {
308 Iterator<String> i = this.allQueues.keySet().iterator();
309 try {
310 for (; i.hasNext();) {
311 logger.fine((String) i.next());
312 }
313 } finally {
314 StoredIterator.close(i);
315 }
316 }
317 }
318 this.alreadyIncluded = createAlreadyIncluded();
319 initQueue();
320 } catch (IOException e) {
321 e.printStackTrace();
322 throw (FatalConfigurationException)
323 new FatalConfigurationException(e.getMessage()).initCause(e);
324 } catch (Exception e) {
325 e.printStackTrace();
326 throw (FatalConfigurationException)
327 new FatalConfigurationException(e.getMessage()).initCause(e);
328 }
329
330 initCostPolicy();
331
332 loadSeeds();
333 }
334
335 /***
336 * Set up the various queues-of-queues used by the frontier. Override
337 * in implementing subclasses to reduce or eliminate risk of queues
338 * growing without bound.
339 */
340 protected void initQueuesOfQueues() {
341
342
343 readyClassQueues = new LinkedBlockingQueue<String>();
344
345
346 inactiveQueues = new LinkedBlockingQueue<String>();
347
348
349 retiredQueues = new LinkedBlockingQueue<String>();
350
351
352
353 snoozedClassQueues = Collections.synchronizedSortedSet(new TreeSet<WorkQueue>());
354 }
355
356 /***
357 * Set (or reset after configuration change) the cost policy in effect.
358 *
359 * @throws FatalConfigurationException
360 */
361 private void initCostPolicy() throws FatalConfigurationException {
362 try {
363 costAssignmentPolicy = (CostAssignmentPolicy) Class.forName(
364 (String) getUncheckedAttribute(null, ATTR_COST_POLICY))
365 .newInstance();
366 } catch (Exception e) {
367 e.printStackTrace();
368 throw new FatalConfigurationException(e.getMessage());
369 }
370 }
371
372
373
374
375 public void crawlEnded(String sExitMessage) {
376
377
378 if (this.alreadyIncluded != null) {
379 this.alreadyIncluded.close();
380 this.alreadyIncluded = null;
381 }
382
383 try {
384 closeQueue();
385 } catch (IOException e) {
386
387 e.printStackTrace();
388 }
389 this.wakeTimer.cancel();
390
391 this.allQueues.close();
392 this.allQueues = null;
393 this.inProcessQueues = null;
394 this.readyClassQueues = null;
395 this.snoozedClassQueues = null;
396 this.inactiveQueues = null;
397 this.retiredQueues = null;
398
399 this.costAssignmentPolicy = null;
400
401
402 super.crawlEnded(sExitMessage);
403 this.controller = null;
404 }
405
406 /***
407 * Create a UriUniqFilter that will serve as record
408 * of already seen URIs.
409 *
410 * @return A UURISet that will serve as a record of already seen URIs
411 * @throws IOException
412 */
413 protected abstract UriUniqFilter createAlreadyIncluded() throws IOException;
414
415 /***
416 * Arrange for the given CandidateURI to be visited, if it is not
417 * already scheduled/completed.
418 *
419 * @see org.archive.crawler.framework.Frontier#schedule(org.archive.crawler.datamodel.CandidateURI)
420 */
421 public void schedule(CandidateURI caUri) {
422
423
424 String canon = canonicalize(caUri);
425 if (caUri.forceFetch()) {
426 alreadyIncluded.addForce(canon, caUri);
427 } else {
428 alreadyIncluded.add(canon, caUri);
429 }
430 }
431
432 /***
433 * Accept the given CandidateURI for scheduling, as it has
434 * passed the alreadyIncluded filter.
435 *
436 * Choose a per-classKey queue and enqueue it. If this
437 * item has made an unready queue ready, place that
438 * queue on the readyClassQueues queue.
439 * @param caUri CandidateURI.
440 */
441 public void receive(CandidateURI caUri) {
442 CrawlURI curi = asCrawlUri(caUri);
443 applySpecialHandling(curi);
444 sendToQueue(curi);
445
446 doJournalAdded(curi);
447 }
448
449
450
451
452 protected CrawlURI asCrawlUri(CandidateURI caUri) {
453 CrawlURI curi = super.asCrawlUri(caUri);
454
455 getCost(curi);
456 return curi;
457 }
458
459 /***
460 * Send a CrawlURI to the appropriate subqueue.
461 *
462 * @param curi
463 */
464 protected void sendToQueue(CrawlURI curi) {
465 WorkQueue wq = getQueueFor(curi);
466 synchronized (wq) {
467 wq.enqueue(this, curi);
468 if(!wq.isRetired()) {
469 incrementQueuedUriCount();
470 }
471 if(!wq.isHeld()) {
472 wq.setHeld();
473 if(holdQueues() && readyClassQueues.size()>=targetSizeForReadyQueues()) {
474 deactivateQueue(wq);
475 } else {
476 replenishSessionBalance(wq);
477 readyQueue(wq);
478 }
479 }
480 WorkQueue laq = longestActiveQueue;
481 if(!wq.isRetired()&&((laq==null) || wq.getCount() > laq.getCount())) {
482 longestActiveQueue = wq;
483 }
484 }
485 }
486
487 /***
488 * Whether queues should start inactive (only becoming active when needed
489 * to keep the crawler busy), or if queues should start out ready.
490 *
491 * @return true if new queues should held inactive
492 */
493 private boolean holdQueues() {
494 return ((Boolean) getUncheckedAttribute(null, ATTR_HOLD_QUEUES))
495 .booleanValue();
496 }
497
498 /***
499 * Put the given queue on the readyClassQueues queue
500 * @param wq
501 */
502 private void readyQueue(WorkQueue wq) {
503 try {
504 wq.setActive(this, true);
505 readyClassQueues.put(wq.getClassKey());
506 } catch (InterruptedException e) {
507 e.printStackTrace();
508 System.err.println("unable to ready queue "+wq);
509
510 throw new RuntimeException(e);
511 }
512 }
513
514 /***
515 * Put the given queue on the inactiveQueues queue
516 * @param wq
517 */
518 private void deactivateQueue(WorkQueue wq) {
519
520 wq.setSessionBalance(0);
521 inactiveQueues.add(wq.getClassKey());
522 wq.setActive(this, false);
523
524
525
526
527
528
529 }
530
531 /***
532 * Put the given queue on the retiredQueues queue
533 * @param wq
534 */
535 private void retireQueue(WorkQueue wq) {
536
537 retiredQueues.add(wq.getClassKey());
538 decrementQueuedCount(wq.getCount());
539 wq.setRetired(true);
540 wq.setActive(this, false);
541
542
543
544
545
546
547 }
548
549 /***
550 * Accomodate any changes in settings.
551 *
552 * @see org.archive.crawler.framework.Frontier#kickUpdate()
553 */
554 public void kickUpdate() {
555 super.kickUpdate();
556 int target = (Integer)getUncheckedAttribute(null,
557 ATTR_TARGET_READY_QUEUES_BACKLOG);
558 if (target < 1) {
559 target = 1;
560 }
561 this.targetSizeForReadyQueues = target;
562 try {
563 initCostPolicy();
564 } catch (FatalConfigurationException fce) {
565 throw new RuntimeException(fce);
566 }
567
568
569
570
571
572 Object key = this.retiredQueues.poll();
573 while (key != null) {
574 WorkQueue q = (WorkQueue)this.allQueues.get((String)key);
575 if(q != null) {
576 unretireQueue(q);
577 }
578 key = this.retiredQueues.poll();
579 }
580 }
581 /***
582 * Restore a retired queue to the 'inactive' state.
583 *
584 * @param q
585 */
586 private void unretireQueue(WorkQueue q) {
587 deactivateQueue(q);
588 q.setRetired(false);
589 incrementQueuedUriCount(q.getCount());
590 }
591
592 /***
593 * Return the work queue for the given CrawlURI's classKey. URIs
594 * are ordered and politeness-delayed within their 'class'.
595 * If the requested queue is not found, a new instance is created.
596 *
597 * @param curi CrawlURI to base queue on
598 * @return the found or created ClassKeyQueue
599 */
600 protected abstract WorkQueue getQueueFor(CrawlURI curi);
601
602 /***
603 * Return the work queue for the given classKey, or null
604 * if no such queue exists.
605 *
606 * @param classKey key to look for
607 * @return the found WorkQueue
608 */
609 protected abstract WorkQueue getQueueFor(String classKey);
610
611 /***
612 * Return the next CrawlURI to be processed (and presumably
613 * visited/fetched) by a a worker thread.
614 *
615 * Relies on the readyClassQueues having been loaded with
616 * any work queues that are eligible to provide a URI.
617 *
618 * @return next CrawlURI to be processed. Or null if none is available.
619 *
620 * @see org.archive.crawler.framework.Frontier#next()
621 */
622 public CrawlURI next()
623 throws InterruptedException, EndedException {
624 while (true) {
625 long now = System.currentTimeMillis();
626
627
628 preNext(now);
629
630
631 if(readyFiller.tryAcquire()) {
632 try {
633 int activationsNeeded = targetSizeForReadyQueues() - readyClassQueues.size();
634 while(activationsNeeded > 0 && !inactiveQueues.isEmpty()) {
635 activateInactiveQueue();
636 activationsNeeded--;
637 }
638 } finally {
639 readyFiller.release();
640 }
641 }
642
643 WorkQueue readyQ = null;
644 Object key = readyClassQueues.poll(DEFAULT_WAIT,TimeUnit.MILLISECONDS);
645 if (key != null) {
646 readyQ = (WorkQueue)this.allQueues.get((String)key);
647 }
648 if (readyQ != null) {
649 while(true) {
650 CrawlURI curi = null;
651 synchronized(readyQ) {
652 curi = readyQ.peek(this);
653 if (curi != null) {
654
655 String currentQueueKey = getClassKey(curi);
656 if (currentQueueKey.equals(curi.getClassKey())) {
657
658 noteAboutToEmit(curi, readyQ);
659 inProcessQueues.add(readyQ);
660 return curi;
661 }
662
663
664
665 curi.setClassKey(currentQueueKey);
666 readyQ.dequeue(this);
667 decrementQueuedCount(1);
668 curi.setHolderKey(null);
669
670
671 } else {
672
673
674
675 readyQ.clearHeld();
676 break;
677 }
678 }
679 if(curi!=null) {
680
681 sendToQueue(curi);
682 }
683 }
684 } else {
685
686 if (key != null) {
687 logger.severe("Key "+ key +
688 " in readyClassQueues but not allQueues");
689 }
690 }
691
692 if(shouldTerminate) {
693
694 throw new EndedException("shouldTerminate is true");
695 }
696
697 if(inProcessQueues.size()==0) {
698
699
700 this.alreadyIncluded.requestFlush();
701 }
702 }
703 }
704
705 private int targetSizeForReadyQueues() {
706 return targetSizeForReadyQueues;
707 }
708
709 /***
710 * Return the 'cost' of a CrawlURI (how much of its associated
711 * queue's budget it depletes upon attempted processing)
712 *
713 * @param curi
714 * @return the associated cost
715 */
716 private int getCost(CrawlURI curi) {
717 int cost = curi.getHolderCost();
718 if (cost == CrawlURI.UNCALCULATED) {
719 cost = costAssignmentPolicy.costOf(curi);
720 curi.setHolderCost(cost);
721 }
722 return cost;
723 }
724
725 /***
726 * Activate an inactive queue, if any are available.
727 */
728 private void activateInactiveQueue() {
729 Object key = this.inactiveQueues.poll();
730 if (key == null) {
731 return;
732 }
733 WorkQueue candidateQ = (WorkQueue)this.allQueues.get((String)key);
734 if(candidateQ != null) {
735 synchronized(candidateQ) {
736 replenishSessionBalance(candidateQ);
737 if(candidateQ.isOverBudget()){
738
739
740 retireQueue(candidateQ);
741 return;
742 }
743 long now = System.currentTimeMillis();
744 long delay_ms = candidateQ.getWakeTime() - now;
745 if(delay_ms>0) {
746
747 snoozeQueue(candidateQ,now,delay_ms);
748 return;
749 }
750 candidateQ.setWakeTime(0);
751 readyQueue(candidateQ);
752 if (logger.isLoggable(Level.FINE)) {
753 logger.fine("ACTIVATED queue: " +
754 candidateQ.getClassKey());
755
756 }
757 }
758 }
759 }
760
761 /***
762 * Replenish the budget of the given queue by the appropriate amount.
763 *
764 * @param queue queue to replenish
765 */
766 private void replenishSessionBalance(WorkQueue queue) {
767 UURI contextUri = queue.getContextUURI(this);
768
769
770 queue.setSessionBalance(((Integer) getUncheckedAttribute(contextUri,
771 ATTR_BALANCE_REPLENISH_AMOUNT)).intValue());
772
773
774 long totalBudget = ((Long)getUncheckedAttribute(contextUri,ATTR_QUEUE_TOTAL_BUDGET)).longValue();
775 queue.setTotalBudget(totalBudget);
776 }
777
778 /***
779 * Enqueue the given queue to either readyClassQueues or inactiveQueues,
780 * as appropriate.
781 *
782 * @param wq
783 */
784 private void reenqueueQueue(WorkQueue wq) {
785 if(wq.isOverBudget()) {
786
787 if (logger.isLoggable(Level.FINE)) {
788 logger.fine("DEACTIVATED queue: " +
789 wq.getClassKey());
790 }
791 deactivateQueue(wq);
792 } else {
793 readyQueue(wq);
794 }
795 }
796
797 /***
798 * Wake any queues sitting in the snoozed queue whose time has come.
799 */
800 void wakeQueues() {
801 long now = System.currentTimeMillis();
802 wakeQueuesAsIfAtTime(now);
803 }
804
805 /***
806 * Wake any queues sitting in the snoozed queue whose time has come.
807 */
808 void wakeQueuesAsIfAtTime(long nowish) {
809 synchronized (snoozedClassQueues) {
810 long nextWakeDelay = 0;
811 int wokenQueuesCount = 0;
812 while (true) {
813 if (snoozedClassQueues.isEmpty()) {
814 return;
815 }
816 WorkQueue peek = (WorkQueue) snoozedClassQueues.first();
817 nextWakeDelay = peek.getWakeTime() - nowish;
818 if (nextWakeDelay <= 0) {
819 snoozedClassQueues.remove(peek);
820 peek.setWakeTime(0);
821 reenqueueQueue(peek);
822 wokenQueuesCount++;
823 } else {
824 break;
825 }
826 }
827 this.nextWake = new WakeTask();
828 this.wakeTimer.schedule(nextWake,nextWakeDelay);
829 }
830 }
831
832 /***
833 * Wake all queues as if we were at the end of time
834 */
835 public void forceWakeQueues() {
836 wakeQueuesAsIfAtTime(Long.MAX_VALUE);
837 }
838
839 /***
840 * Note that the previously emitted CrawlURI has completed
841 * its processing (for now).
842 *
843 * The CrawlURI may be scheduled to retry, if appropriate,
844 * and other related URIs may become eligible for release
845 * via the next next() call, as a result of finished().
846 *
847 * (non-Javadoc)
848 * @see org.archive.crawler.framework.Frontier#finished(org.archive.crawler.datamodel.CrawlURI)
849 */
850 public void finished(CrawlURI curi) {
851 long now = System.currentTimeMillis();
852
853 curi.incrementFetchAttempts();
854 logLocalizedErrors(curi);
855 WorkQueue wq = (WorkQueue) curi.getHolder();
856 assert (wq.peek(this) == curi) : "unexpected peek " + wq;
857 inProcessQueues.remove(wq, 1);
858
859 if(includesRetireDirective(curi)) {
860
861 curi.processingCleanup();
862 wq.unpeek();
863 wq.update(this, curi);
864 retireQueue(wq);
865 return;
866 }
867
868 if (needsRetrying(curi)) {
869
870 if(curi.getFetchStatus()!=S_DEFERRED) {
871 wq.expend(getCost(curi));
872 }
873 long delay_sec = retryDelayFor(curi);
874 curi.processingCleanup();
875 synchronized(wq) {
876 wq.unpeek();
877
878 wq.update(this, curi);
879 if (delay_sec > 0) {
880 long delay_ms = delay_sec * 1000;
881 snoozeQueue(wq, now, delay_ms);
882 } else {
883 reenqueueQueue(wq);
884 }
885 }
886
887 controller.fireCrawledURINeedRetryEvent(curi);
888 doJournalRescheduled(curi);
889 return;
890 }
891
892
893 wq.dequeue(this);
894 decrementQueuedCount(1);
895 log(curi);
896
897 if (curi.isSuccess()) {
898 totalProcessedBytes += curi.getRecordedSize();
899 incrementSucceededFetchCount();
900
901 controller.fireCrawledURISuccessfulEvent(curi);
902 doJournalFinishedSuccess(curi);
903 wq.expend(getCost(curi));
904 } else if (isDisregarded(curi)) {
905
906
907 incrementDisregardedUriCount();
908
909 controller.fireCrawledURIDisregardEvent(curi);
910 doJournalDisregarded(curi);
911
912 if (curi.getFetchStatus() == S_RUNTIME_EXCEPTION) {
913 Object[] array = { curi };
914 controller.runtimeErrors.log(Level.WARNING, curi.getUURI()
915 .toString(), array);
916 }
917
918 } else {
919
920
921 this.controller.fireCrawledURIFailureEvent(curi);
922
923 if (curi.getFetchStatus() == S_RUNTIME_EXCEPTION) {
924 Object[] array = { curi };
925 this.controller.runtimeErrors.log(Level.WARNING, curi.getUURI()
926 .toString(), array);
927 }
928 incrementFailedFetchCount();
929
930 wq.noteError(((Integer) getUncheckedAttribute(curi,
931 ATTR_ERROR_PENALTY_AMOUNT)).intValue());
932 doJournalFinishedFailure(curi);
933 wq.expend(getCost(curi));
934 }
935
936 long delay_ms = politenessDelayFor(curi);
937 synchronized(wq) {
938 if (delay_ms > 0) {
939 snoozeQueue(wq,now,delay_ms);
940 } else {
941 reenqueueQueue(wq);
942 }
943 }
944
945 curi.stripToMinimal();
946 curi.processingCleanup();
947
948 }
949
950 private boolean includesRetireDirective(CrawlURI curi) {
951 return curi.containsKey(A_FORCE_RETIRE) && (Boolean)curi.getObject(A_FORCE_RETIRE);
952 }
953
954 /***
955 * Place the given queue into 'snoozed' state, ineligible to
956 * supply any URIs for crawling, for the given amount of time.
957 *
958 * @param wq queue to snooze
959 * @param now time now in ms
960 * @param delay_ms time to snooze in ms
961 */
962 private void snoozeQueue(WorkQueue wq, long now, long delay_ms) {
963 long nextTime = now + delay_ms;
964 wq.setWakeTime(nextTime);
965 long snoozeToInactiveDelayMs = ((Long)getUncheckedAttribute(null,
966 ATTR_SNOOZE_DEACTIVATE_MS)).longValue();
967 if (delay_ms > snoozeToInactiveDelayMs && !inactiveQueues.isEmpty()) {
968 deactivateQueue(wq);
969 } else {
970 synchronized(snoozedClassQueues) {
971 snoozedClassQueues.add(wq);
972 if(wq == snoozedClassQueues.first()) {
973 this.nextWake = new WakeTask();
974 this.wakeTimer.schedule(nextWake, delay_ms);
975 }
976 }
977 }
978 }
979
980 /***
981 * Forget the given CrawlURI. This allows a new instance
982 * to be created in the future, if it is reencountered under
983 * different circumstances.
984 *
985 * @param curi The CrawlURI to forget
986 */
987 protected void forget(CrawlURI curi) {
988 logger.finer("Forgetting " + curi);
989 alreadyIncluded.forget(canonicalize(curi.getUURI()), curi);
990 }
991
992 /*** (non-Javadoc)
993 * @see org.archive.crawler.framework.Frontier#discoveredUriCount()
994 */
995 public long discoveredUriCount() {
996 return (this.alreadyIncluded != null)? this.alreadyIncluded.count(): 0;
997 }
998
999 /***
1000 * Delete all scheduled URIs matching the given regex.
1001 *
1002 * @param match regex of URIs to delete
1003 * @return Number of items deleted.
1004 */
1005 public long deleteURIs(String uriMatch) {
1006 return deleteURIs(uriMatch,null);
1007 }
1008
1009 /***
1010 * Delete all scheduled URIs matching the given regex, in queues with
1011 * names matching the second given regex.
1012 *
1013 * @param uriMatch regex of URIs to delete
1014 * @param queueMatch regex of queues to affect, or null for all
1015 * @return Number of items deleted.
1016 */
1017 public long deleteURIs(String uriMatch, String queueMatch) {
1018 long count = 0;
1019
1020 Iterator<String> iter = allQueues.keySet().iterator();
1021 while(iter.hasNext()) {
1022 String queueKey = ((String)iter.next());
1023 if(StringUtils.isNotEmpty(queueMatch) && !queueKey.matches(queueMatch)) {
1024
1025 continue;
1026 }
1027 WorkQueue wq = getQueueFor(queueKey);
1028 wq.unpeek();
1029 count += wq.deleteMatching(this, uriMatch);
1030 }
1031 decrementQueuedCount(count);
1032 return count;
1033 }
1034
1035
1036
1037
1038
1039 public static String STANDARD_REPORT = "standard";
1040 public static String ALL_NONEMPTY = "nonempty";
1041 public static String ALL_QUEUES = "all";
1042 protected static String[] REPORTS = {STANDARD_REPORT,ALL_NONEMPTY,ALL_QUEUES};
1043
1044 public String[] getReports() {
1045 return REPORTS;
1046 }
1047
1048 /***
1049 * @param w Where to write to.
1050 */
1051 public void singleLineReportTo(PrintWriter w) {
1052 if (this.allQueues == null) {
1053 return;
1054 }
1055 int allCount = allQueues.size();
1056 int inProcessCount = inProcessQueues.uniqueSet().size();
1057 int readyCount = readyClassQueues.size();
1058 int snoozedCount = snoozedClassQueues.size();
1059 int activeCount = inProcessCount + readyCount + snoozedCount;
1060 int inactiveCount = inactiveQueues.size();
1061 int retiredCount = retiredQueues.size();
1062 int exhaustedCount =
1063 allCount - activeCount - inactiveCount - retiredCount;
1064 w.print(allCount);
1065 w.print(" queues: ");
1066 w.print(activeCount);
1067 w.print(" active (");
1068 w.print(inProcessCount);
1069 w.print(" in-process; ");
1070 w.print(readyCount);
1071 w.print(" ready; ");
1072 w.print(snoozedCount);
1073 w.print(" snoozed); ");
1074 w.print(inactiveCount);
1075 w.print(" inactive; ");
1076 w.print(retiredCount);
1077 w.print(" retired; ");
1078 w.print(exhaustedCount);
1079 w.print(" exhausted");
1080 w.flush();
1081 }
1082
1083
1084
1085
1086 public String singleLineLegend() {
1087 return "total active in-process ready snoozed inactive retired exhausted";
1088 }
1089
1090 /***
1091 * This method compiles a human readable report on the status of the frontier
1092 * at the time of the call.
1093 * @param name Name of report.
1094 * @param writer Where to write to.
1095 */
1096 public synchronized void reportTo(String name, PrintWriter writer) {
1097 if(ALL_NONEMPTY.equals(name)) {
1098 allNonemptyReportTo(writer);
1099 return;
1100 }
1101 if(ALL_QUEUES.equals(name)) {
1102 allQueuesReportTo(writer);
1103 return;
1104 }
1105 if(name!=null && !STANDARD_REPORT.equals(name)) {
1106 writer.print(name);
1107 writer.print(" unavailable; standard report:\n");
1108 }
1109 standardReportTo(writer);
1110 }
1111
1112 /*** Compact report of all nonempty queues (one queue per line)
1113 *
1114 * @param writer
1115 */
1116 private void allNonemptyReportTo(PrintWriter writer) {
1117 ArrayList<WorkQueue> inProcessQueuesCopy;
1118 synchronized(this.inProcessQueues) {
1119
1120 @SuppressWarnings("unchecked")
1121 Collection<WorkQueue> inProcess = this.inProcessQueues;
1122 inProcessQueuesCopy = new ArrayList<WorkQueue>(inProcess);
1123 }
1124 writer.print("\n -----===== IN-PROCESS QUEUES =====-----\n");
1125 queueSingleLinesTo(writer, inProcessQueuesCopy.iterator());
1126
1127 writer.print("\n -----===== READY QUEUES =====-----\n");
1128 queueSingleLinesTo(writer, this.readyClassQueues.iterator());
1129
1130 writer.print("\n -----===== SNOOZED QUEUES =====-----\n");
1131 queueSingleLinesTo(writer, this.snoozedClassQueues.iterator());
1132
1133 writer.print("\n -----===== INACTIVE QUEUES =====-----\n");
1134 queueSingleLinesTo(writer, this.inactiveQueues.iterator());
1135
1136 writer.print("\n -----===== RETIRED QUEUES =====-----\n");
1137 queueSingleLinesTo(writer, this.retiredQueues.iterator());
1138 }
1139
1140 /*** Compact report of all nonempty queues (one queue per line)
1141 *
1142 * @param writer
1143 */
1144 private void allQueuesReportTo(PrintWriter writer) {
1145 queueSingleLinesTo(writer, allQueues.keySet().iterator());
1146 }
1147
1148 /***
1149 * Write the single-line reports of all queues in the
1150 * iterator to the writer
1151 *
1152 * @param writer to receive report
1153 * @param iterator over queues of interest.
1154 */
1155 private void queueSingleLinesTo(PrintWriter writer, Iterator<?> iterator) {
1156 Object obj;
1157 WorkQueue q;
1158 boolean legendWritten = false;
1159 while( iterator.hasNext()) {
1160 obj = iterator.next();
1161 if (obj == null) {
1162 continue;
1163 }
1164 q = (obj instanceof WorkQueue)?
1165 (WorkQueue)obj:
1166 (WorkQueue)this.allQueues.get((String)obj);
1167 if(q == null) {
1168 writer.print(" ERROR: "+obj);
1169 }
1170 if(!legendWritten) {
1171 writer.println(q.singleLineLegend());
1172 legendWritten = true;
1173 }
1174 q.singleLineReportTo(writer);
1175 }
1176 }
1177
1178 /***
1179 * @param w Writer to print to.
1180 */
1181 private void standardReportTo(PrintWriter w) {
1182 int allCount = allQueues.size();
1183 int inProcessCount = inProcessQueues.uniqueSet().size();
1184 int readyCount = readyClassQueues.size();
1185 int snoozedCount = snoozedClassQueues.size();
1186 int activeCount = inProcessCount + readyCount + snoozedCount;
1187 int inactiveCount = inactiveQueues.size();
1188 int retiredCount = retiredQueues.size();
1189 int exhaustedCount =
1190 allCount - activeCount - inactiveCount - retiredCount;
1191
1192 w.print("Frontier report - ");
1193 w.print(ArchiveUtils.get12DigitDate());
1194 w.print("\n");
1195 w.print(" Job being crawled: ");
1196 w.print(controller.getOrder().getCrawlOrderName());
1197 w.print("\n");
1198 w.print("\n -----===== STATS =====-----\n");
1199 w.print(" Discovered: ");
1200 w.print(Long.toString(discoveredUriCount()));
1201 w.print("\n");
1202 w.print(" Queued: ");
1203 w.print(Long.toString(queuedUriCount()));
1204 w.print("\n");
1205 w.print(" Finished: ");
1206 w.print(Long.toString(finishedUriCount()));
1207 w.print("\n");
1208 w.print(" Successfully: ");
1209 w.print(Long.toString(succeededFetchCount()));
1210 w.print("\n");
1211 w.print(" Failed: ");
1212 w.print(Long.toString(failedFetchCount()));
1213 w.print("\n");
1214 w.print(" Disregarded: ");
1215 w.print(Long.toString(disregardedUriCount()));
1216 w.print("\n");
1217 w.print("\n -----===== QUEUES =====-----\n");
1218 w.print(" Already included size: ");
1219 w.print(Long.toString(alreadyIncluded.count()));
1220 w.print("\n");
1221 w.print(" pending: ");
1222 w.print(Long.toString(alreadyIncluded.pending()));
1223 w.print("\n");
1224 w.print("\n All class queues map size: ");
1225 w.print(Long.toString(allCount));
1226 w.print("\n");
1227 w.print( " Active queues: ");
1228 w.print(activeCount);
1229 w.print("\n");
1230 w.print(" In-process: ");
1231 w.print(inProcessCount);
1232 w.print("\n");
1233 w.print(" Ready: ");
1234 w.print(readyCount);
1235 w.print("\n");
1236 w.print(" Snoozed: ");
1237 w.print(snoozedCount);
1238 w.print("\n");
1239 w.print(" Inactive queues: ");
1240 w.print(inactiveCount);
1241 w.print("\n");
1242 w.print(" Retired queues: ");
1243 w.print(retiredCount);
1244 w.print("\n");
1245 w.print(" Exhausted queues: ");
1246 w.print(exhaustedCount);
1247 w.print("\n");
1248
1249 w.print("\n -----===== IN-PROCESS QUEUES =====-----\n");
1250 @SuppressWarnings("unchecked")
1251 Collection<WorkQueue> inProcess = inProcessQueues;
1252 ArrayList<WorkQueue> copy = extractSome(inProcess, REPORT_MAX_QUEUES);
1253 appendQueueReports(w, copy.iterator(), copy.size(), REPORT_MAX_QUEUES);
1254
1255 w.print("\n -----===== READY QUEUES =====-----\n");
1256 appendQueueReports(w, this.readyClassQueues.iterator(),
1257 this.readyClassQueues.size(), REPORT_MAX_QUEUES);
1258
1259 w.print("\n -----===== SNOOZED QUEUES =====-----\n");
1260 copy = extractSome(snoozedClassQueues, REPORT_MAX_QUEUES);
1261 appendQueueReports(w, copy.iterator(), copy.size(), REPORT_MAX_QUEUES);
1262
1263 WorkQueue longest = longestActiveQueue;
1264 if (longest != null) {
1265 w.print("\n -----===== LONGEST QUEUE =====-----\n");
1266 longest.reportTo(w);
1267 }
1268
1269 w.print("\n -----===== INACTIVE QUEUES =====-----\n");
1270 appendQueueReports(w, this.inactiveQueues.iterator(),
1271 this.inactiveQueues.size(), REPORT_MAX_QUEUES);
1272
1273 w.print("\n -----===== RETIRED QUEUES =====-----\n");
1274 appendQueueReports(w, this.retiredQueues.iterator(),
1275 this.retiredQueues.size(), REPORT_MAX_QUEUES);
1276
1277 w.flush();
1278 }
1279
1280
1281 /***
1282 * Extract some of the elements in the given collection to an
1283 * ArrayList. This method synchronizes on the given collection's
1284 * monitor. The returned list will never contain more than the
1285 * specified maximum number of elements.
1286 *
1287 * @param c the collection whose elements to extract
1288 * @param max the maximum number of elements to extract
1289 * @return the extraction
1290 */
1291 private static <T> ArrayList<T> extractSome(Collection<T> c, int max) {
1292
1293
1294
1295 int initial = Math.min(c.size() + 10, max);
1296 int count = 0;
1297 ArrayList<T> list = new ArrayList<T>(initial);
1298 synchronized (c) {
1299 Iterator<T> iter = c.iterator();
1300 while (iter.hasNext() && (count < max)) {
1301 list.add(iter.next());
1302 count++;
1303 }
1304 }
1305 return list;
1306 }
1307
1308 /***
1309 * Append queue report to general Frontier report.
1310 * @param w StringBuffer to append to.
1311 * @param iterator An iterator over
1312 * @param total
1313 * @param max
1314 */
1315 protected void appendQueueReports(PrintWriter w, Iterator<?> iterator,
1316 int total, int max) {
1317 Object obj;
1318 WorkQueue q;
1319 for(int count = 0; iterator.hasNext() && (count < max); count++) {
1320 obj = iterator.next();
1321 if (obj == null) {
1322 continue;
1323 }
1324 q = (obj instanceof WorkQueue)?
1325 (WorkQueue)obj:
1326 (WorkQueue)this.allQueues.get((String)obj);
1327 if(q == null) {
1328 w.print("WARNING: No report for queue "+obj);
1329 }
1330 q.reportTo(w);
1331 }
1332 if(total > max) {
1333 w.print("...and " + (total - max) + " more.\n");
1334 }
1335 }
1336
1337 /***
1338 * Force logging, etc. of operator- deleted CrawlURIs
1339 *
1340 * @see org.archive.crawler.framework.Frontier#deleted(org.archive.crawler.datamodel.CrawlURI)
1341 */
1342 public synchronized void deleted(CrawlURI curi) {
1343
1344 controller.fireCrawledURIDisregardEvent(curi);
1345 log(curi);
1346 incrementDisregardedUriCount();
1347 curi.stripToMinimal();
1348 curi.processingCleanup();
1349 }
1350
1351 public void considerIncluded(UURI u) {
1352 this.alreadyIncluded.note(canonicalize(u));
1353 CrawlURI temp = new CrawlURI(u);
1354 temp.setClassKey(getClassKey(temp));
1355 getQueueFor(temp).expend(getCost(temp));
1356 }
1357
1358 protected abstract void initQueue() throws IOException;
1359 protected abstract void closeQueue() throws IOException;
1360
1361 /***
1362 * Returns <code>true</code> if the WorkQueue implementation of this
1363 * Frontier stores its workload on disk instead of relying
1364 * on serialization mechanisms.
1365 *
1366 * TODO: rename! (this is a very misleading name) or kill (don't
1367 * see any implementations that return false)
1368 *
1369 * @return a constant boolean value for this class/instance
1370 */
1371 protected abstract boolean workQueueDataOnDisk();
1372
1373
1374 public FrontierGroup getGroup(CrawlURI curi) {
1375 return getQueueFor(curi);
1376 }
1377
1378
1379 public long averageDepth() {
1380 int inProcessCount = inProcessQueues.uniqueSet().size();
1381 int readyCount = readyClassQueues.size();
1382 int snoozedCount = snoozedClassQueues.size();
1383 int activeCount = inProcessCount + readyCount + snoozedCount;
1384 int inactiveCount = inactiveQueues.size();
1385 int totalQueueCount = (activeCount+inactiveCount);
1386 return (totalQueueCount == 0) ? 0 : liveQueuedUriCount.get() / totalQueueCount;
1387 }
1388 public float congestionRatio() {
1389 int inProcessCount = inProcessQueues.uniqueSet().size();
1390 int readyCount = readyClassQueues.size();
1391 int snoozedCount = snoozedClassQueues.size();
1392 int activeCount = inProcessCount + readyCount + snoozedCount;
1393 int inactiveCount = inactiveQueues.size();
1394 return (float)(activeCount + inactiveCount) / (inProcessCount + snoozedCount);
1395 }
1396 public long deepestUri() {
1397 return longestActiveQueue==null ? -1 : longestActiveQueue.getCount();
1398 }
1399
1400
1401
1402
1403
1404 public synchronized boolean isEmpty() {
1405 return liveQueuedUriCount.get() == 0 && alreadyIncluded.pending() == 0;
1406 }
1407 }
1408