1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 package org.archive.crawler.frontier;
24
25 import java.io.IOException;
26 import java.util.logging.Level;
27 import java.util.logging.Logger;
28
29 import org.archive.crawler.datamodel.CandidateURI;
30 import org.archive.crawler.datamodel.CrawlSubstats;
31 import org.archive.crawler.datamodel.CrawlURI;
32 import org.archive.crawler.framework.Frontier.FrontierGroup;
33 import org.archive.util.ArchiveUtils;
34
35 import com.sleepycat.bind.EntryBinding;
36 import com.sleepycat.bind.serial.ClassCatalog;
37 import com.sleepycat.bind.serial.SerialBinding;
38 import com.sleepycat.bind.serial.StoredClassCatalog;
39 import com.sleepycat.bind.serial.TupleSerialKeyCreator;
40 import com.sleepycat.bind.tuple.StringBinding;
41 import com.sleepycat.bind.tuple.TupleBinding;
42 import com.sleepycat.bind.tuple.TupleInput;
43 import com.sleepycat.bind.tuple.TupleOutput;
44 import com.sleepycat.je.Cursor;
45 import com.sleepycat.je.Database;
46 import com.sleepycat.je.DatabaseConfig;
47 import com.sleepycat.je.DatabaseEntry;
48 import com.sleepycat.je.DatabaseException;
49 import com.sleepycat.je.Environment;
50 import com.sleepycat.je.LockMode;
51 import com.sleepycat.je.OperationStatus;
52 import com.sleepycat.je.SecondaryConfig;
53 import com.sleepycat.je.SecondaryDatabase;
54
55 /***
56 * A priority based queue of CrawlURIs. Each queue should represent
57 * one host (although this is not enforced in this class). Items are ordered
58 * by the scheduling directive and time of next processing (in that order)
59 * and also indexed by the URI.
60 * <p>
61 * The HQ does no calculations on the 'time of next processing.' It always
62 * relies on values already set on the CrawlURI.
63 * <p>
64 * Note: Class is not 'thread safe.' In multi threaded environment the caller
65 * must ensure that two threads do not make overlapping calls.
66 * <p>
67 * Any BDB DatabaseException will be converted to an IOException by public
68 * methods. This includes preserving the original stacktrace, in favor of the
69 * one created for the IOException, so that the true source of the exception
70 * is not lost.
71 *
72 * @author Kristinn Sigurdsson
73 */
74 public class AdaptiveRevisitHostQueue
75 implements AdaptiveRevisitAttributeConstants, FrontierGroup {
76
77
78
79
80 /*** HQ contains no queued CrawlURIs elements. This state only occurs after
81 * queue creation before the first add. After the first item is added the
82 * state can never become empty again. */
83 public static final int HQSTATE_EMPTY = 0;
84 /*** HQ has a CrawlURI ready for processing */
85 public static final int HQSTATE_READY = 1;
86 /*** HQ has maximum number of CrawlURI currently being processed. This number
87 * is either equal to the 'valence' (maximum number of simultanious
88 * connections to a host) or (if smaller) the total number of CrawlURIs
89 * in the HQ. */
90 public static final int HQSTATE_BUSY = 2;
91 /*** HQ is in a suspended state until it can be woken back up */
92 public static final int HQSTATE_SNOOZED = 3;
93
94
95 /*** Name of the host that this AdaptiveRevisitHostQueue represents */
96 final String hostName;
97 /*** Last known state of HQ -- ALL methods should use getState() to read
98 * this value, never read it directly. */
99 int state;
100 /*** Time (in milliseconds) when the HQ will next be ready to issue a URI
101 * for processing. When setting this value, methods should use the
102 * setter method {@link #setNextReadyTime(long) setNextReadyTime()}
103 */
104 long nextReadyTime;
105 /*** Time (in milliseconds) when each URI 'slot' becomes available again.<p>
106 * Any positive value larger then the current time signifies a taken slot
107 * where the URI has completed processing but the politness wait has not
108 * ended. <p>
109 * A zero or positive value smaller then the current time in milliseconds
110 * signifies an empty slot.<p>
111 * Any negative value signifies a slot for a URI that is being processed.
112 * <p>
113 * Methods should never write directly to this, rather use the
114 * {@link #updateWakeUpTimeSlot(long) updateWakeUpTimeSlot()} and
115 * {@link #useWakeUpTimeSlot() useWakeUpTimeSlot()} methods as needed.
116 */
117 long[] wakeUpTime;
118 /*** Number of simultanious connections permitted to this host. I.e. this
119 * many URIs can be issued before state of HQ becomes busy until one of
120 * them is returned via the update method. */
121 int valence;
122 /***
123 * Size of queue. That is, the number of CrawlURIs that have been added to
124 * it, including any that are currently being processed.
125 */
126 long size;
127 /*** Number of URIs belonging to this queue that are being processed at the
128 * moment. This number will always be in the range of 0 - valence
129 */
130 long inProcessing;
131 /*** The AdaptiveRevisitHostQueueList that contains this class. This
132 * reference is
133 * maintained to inform the owning class of changes to the sort order
134 * value. Value may be null, in which case no notices are made.*/
135 private AdaptiveRevisitQueueList owner;
136 /*** Logger */
137 private static final Logger logger =
138 Logger.getLogger(AdaptiveRevisitHostQueue.class.getName());
139
140 protected CrawlSubstats substats = new CrawlSubstats();
141
142
143
144 /*** Database containing the URI priority queue, indexed by the the
145 * URI string. */
146 protected Database primaryUriDB;
147 /*** Secondary index into {@link #primaryUriDB the primary DB}, URIs indexed
148 * by the time when they can next be processed again. */
149 protected SecondaryDatabase secondaryUriDB;
150 /*** A database containing those URIs that are currently being processed. */
151 protected Database processingUriDB;
152
153 /*** For BDB serialization of objects */
154 protected StoredClassCatalog classCatalog;
155 /*** A binding for the serialization of the primary key (URI string) */
156 protected EntryBinding primaryKeyBinding;
157 /*** A binding for the CrawlURIARWrapper object */
158 protected EntryBinding crawlURIBinding;
159
160
161
162
163 /***
164 * Constructor
165 *
166 * @param hostName Name of the host this queue represents. This name must
167 * be unique for all HQs in the same Environment.
168 * @param env Berkeley DB Environment. All BDB databases created will use
169 * it.
170 * @param catalog Db for bdb class serialization.
171 * @param valence The total number of simultanous URIs that the HQ can issue
172 * for processing. Once this many URIs have been issued for
173 * processing, the HQ will go into {@link #HQSTATE_BUSY busy}
174 * state until at least one of the URI is
175 * {@link #update(CrawlURI, boolean, long) updated}.
176 * Value should be larger then zero. Zero and negative values
177 * will be treated same as 1.
178 *
179 * @throws IOException if an error occurs opening/creating the
180 * database
181 */
182 public AdaptiveRevisitHostQueue(String hostName, Environment env,
183 StoredClassCatalog catalog, int valence)
184 throws IOException {
185 try{
186 if(valence < 1) {
187 this.valence = 1;
188 } else {
189 this.valence = valence;
190 }
191 wakeUpTime = new long[valence];
192 for(int i = 0 ; i < valence ; i++){
193 wakeUpTime[i]=0;
194 }
195
196 inProcessing = 0;
197
198 this.hostName = hostName;
199
200 state = HQSTATE_EMPTY;
201 nextReadyTime = Long.MAX_VALUE;
202
203
204 DatabaseConfig dbConfig = new DatabaseConfig();
205 dbConfig.setTransactional(false);
206 dbConfig.setAllowCreate(true);
207 primaryUriDB = env.openDatabase(null, hostName, dbConfig);
208
209 this.classCatalog = catalog;
210
211
212 DatabaseConfig dbConfig2 = new DatabaseConfig();
213 dbConfig2.setTransactional(false);
214 dbConfig2.setAllowCreate(true);
215 processingUriDB = env.openDatabase(null,
216 hostName + "/processing", dbConfig2);
217
218
219 primaryKeyBinding = TupleBinding.getPrimitiveBinding(String.class);
220
221 crawlURIBinding = new SerialBinding(classCatalog, CrawlURI.class);
222
223
224
225 SecondaryConfig secConfig = new SecondaryConfig();
226 secConfig.setAllowCreate(true);
227 secConfig.setSortedDuplicates(true);
228 secConfig.setKeyCreator(
229 new OrderOfProcessingKeyCreator(classCatalog,CrawlURI.class));
230 secondaryUriDB = env.openSecondaryDatabase(null,
231 hostName+"/timeOfProcessing", primaryUriDB, secConfig);
232
233
234 size = countCrawlURIs();
235 if (size > 0) {
236
237
238 nextReadyTime = peek().getLong(
239 A_TIME_OF_NEXT_PROCESSING);
240
241
242 flushProcessingURIs();
243 state = HQSTATE_READY;
244 }
245 } catch (DatabaseException e) {
246
247 IOException e2 = new IOException(e.getMessage());
248 e2.setStackTrace(e.getStackTrace());
249 throw e2;
250 }
251 }
252
253 /***
254 * Returns the HQ's name
255 * @return the HQ's name
256 */
257 public String getHostName() {
258 return hostName;
259 }
260
261 /***
262 * Add a CrawlURI to this host queue.
263 * <p>
264 * Calls can optionally chose to have the time of next processing value
265 * override existing values for the URI if the existing values are 'later'
266 * then the new ones.
267 *
268 * @param curi The CrawlURI to add.
269 * @param overrideSetTimeOnDups If true then the time of next processing for
270 * the supplied URI will override the any
271 * existing time for it already stored in the HQ.
272 * If false, then no changes will be made to any
273 * existing values of the URI. Note: Will never
274 * override with a later time.
275 * @throws IOException When an error occurs accessing the database
276 */
277 public void add(CrawlURI curi, boolean overrideSetTimeOnDups)
278 throws IOException{
279 if(logger.isLoggable(Level.FINER)){
280 logger.finer("Adding " + curi.toString());
281 }
282 try{
283 if(inProcessing(curi.toString())){
284
285
286 return;
287 }
288
289 OperationStatus opStatus = strictAdd(curi,false);
290
291 long curiProcessingTime = curi.getLong(
292 A_TIME_OF_NEXT_PROCESSING);
293
294 if (opStatus == OperationStatus.KEYEXIST){
295
296
297
298
299
300 boolean update = false;
301 CrawlURI curiExisting = getCrawlURI(curi.toString());
302 long oldCuriProcessingTime = curiExisting.getLong(
303 A_TIME_OF_NEXT_PROCESSING);
304 if(curi.getSchedulingDirective() <
305 curiExisting.getSchedulingDirective()){
306
307
308 curiExisting.setSchedulingDirective(
309 curi.getSchedulingDirective());
310 update = true;
311 }
312 if( (curiProcessingTime < oldCuriProcessingTime)
313 && (overrideSetTimeOnDups || update)){
314
315
316
317
318 curiExisting.putLong(
319 A_TIME_OF_NEXT_PROCESSING,
320 curiProcessingTime);
321 update = true;
322 }
323 if(update){
324 opStatus = strictAdd(curiExisting,true);
325 } else {
326 return;
327 }
328 } else if(opStatus == OperationStatus.SUCCESS) {
329
330 size++;
331 }
332
333
334
335 if(opStatus == OperationStatus.SUCCESS){
336 if (curiProcessingTime < nextReadyTime){
337
338 setNextReadyTime(curiProcessingTime);
339 }
340 if(state == HQSTATE_EMPTY){
341
342 state = HQSTATE_READY;
343 }
344 } else {
345
346 throw new IOException("Error on add into database for " +
347 "CrawlURI " + curi.toString() + ". " +
348 opStatus.toString());
349 }
350 } catch (DatabaseException e) {
351
352 IOException e2 = new IOException(e.getMessage());
353 e2.setStackTrace(e.getStackTrace());
354 throw e2;
355 }
356 reorder();
357 }
358
359 /***
360 * An internal method for adding URIs to the queue.
361 *
362 * @param curi The CrawlURI to add
363 * @param overrideDuplicates If true then any existing CrawlURI in the DB
364 * will be overwritten. If false insert into the
365 * queue is only performed if the key doesn't
366 * already exist.
367 * @return The OperationStatus object returned by the put method.
368 *
369 * @throws DatabaseException
370 */
371 protected OperationStatus strictAdd(CrawlURI curi,
372 boolean overrideDuplicates)
373 throws DatabaseException{
374 DatabaseEntry keyEntry = new DatabaseEntry();
375 DatabaseEntry dataEntry = new DatabaseEntry();
376 primaryKeyBinding.objectToEntry(curi.toString(), keyEntry);
377 crawlURIBinding.objectToEntry(curi, dataEntry);
378 OperationStatus opStatus = null;
379 if(overrideDuplicates){
380 opStatus = primaryUriDB.put(null,keyEntry,dataEntry);
381 } else {
382 opStatus = primaryUriDB.putNoOverwrite(null,keyEntry,dataEntry);
383 }
384
385 return opStatus;
386 }
387
388 /***
389 * Flush any CrawlURIs in the processingUriDB into the primaryUriDB. URIs
390 * flushed will have their 'time of next fetch' maintained and the
391 * nextReadyTime will be updated if needed.
392 * <p>
393 * No change is made to the list of available slots.
394 *
395 * @throws DatabaseException if one occurs while flushing
396 */
397 protected void flushProcessingURIs() throws DatabaseException, IOException {
398 Cursor processingCursor = processingUriDB.openCursor(null,null);
399 DatabaseEntry keyEntry = new DatabaseEntry();
400 DatabaseEntry dataEntry = new DatabaseEntry();
401
402 while(true){
403 OperationStatus opStatus = processingCursor.getFirst(
404 keyEntry, dataEntry, LockMode.DEFAULT);
405
406 if(opStatus == OperationStatus.SUCCESS){
407
408 CrawlURI curi =
409 (CrawlURI) crawlURIBinding.entryToObject(dataEntry);
410
411 deleteInProcessing(curi.toString());
412
413 strictAdd(curi,false);
414
415
416 long curiNextReadyTime = curi.getLong(
417 A_TIME_OF_NEXT_PROCESSING);
418 if(curiNextReadyTime<nextReadyTime){
419 setNextReadyTime(curiNextReadyTime);
420 }
421 } else {
422
423 processingCursor.close();
424 return;
425 }
426 }
427 }
428
429 /***
430 * Count all entries in both primaryUriDB and processingUriDB.
431 * <p>
432 * This method is needed since BDB does not provide a simple way of counting
433 * entries.
434 * <p>
435 * Note: This is an expensive operation, requires a loop through the entire
436 * queue!
437 * @return the number of distinct CrawlURIs in the HQ.
438 * @throws DatabaseException
439 */
440 protected long countCrawlURIs() throws DatabaseException{
441
442
443 long count = 0;
444
445 DatabaseEntry keyEntry = new DatabaseEntry();
446 DatabaseEntry dataEntry = new DatabaseEntry();
447
448
449 Cursor primaryCursor = primaryUriDB.openCursor(null,null);
450 OperationStatus opStatus = primaryCursor.getFirst(keyEntry,
451 dataEntry,
452 LockMode.DEFAULT);
453 while(opStatus == OperationStatus.SUCCESS){
454 count++;
455 opStatus = primaryCursor.getNext(keyEntry,
456 dataEntry,
457 LockMode.DEFAULT);
458 }
459
460 primaryCursor.close();
461
462
463 Cursor processingCursor = processingUriDB.openCursor(null,null);
464 opStatus = processingCursor.getFirst(keyEntry,
465 dataEntry,
466 LockMode.DEFAULT);
467 while(opStatus == OperationStatus.SUCCESS){
468 count++;
469 opStatus = processingCursor.getNext(keyEntry,
470 dataEntry,
471 LockMode.DEFAULT);
472 }
473
474 processingCursor.close();
475 return count;
476 }
477
478 /***
479 * Returns true if this HQ has a CrawlURI matching the uri string currently
480 * being processed. False otherwise.
481 *
482 * @param uri Uri to check
483 * @return true if this HQ has a CrawlURI matching the uri string currently
484 * being processed. False otherwise.
485 *
486 * @throws DatabaseException
487 */
488 protected boolean inProcessing(String uri) throws DatabaseException{
489 DatabaseEntry keyEntry = new DatabaseEntry();
490 DatabaseEntry dataEntry = new DatabaseEntry();
491
492 StringBinding.stringToEntry(uri,keyEntry);
493
494 OperationStatus opStatus = processingUriDB.get(null,
495 keyEntry,
496 dataEntry,
497 LockMode.DEFAULT);
498
499 if (opStatus == OperationStatus.SUCCESS){
500 return true;
501 }
502
503 return false;
504 }
505
506 /***
507 * Removes a URI from the list of URIs belonging to this HQ and are
508 * currently being processed.
509 * <p>
510 * Returns true if successful, false if the URI was not found.
511 *
512 * @param uri The URI string of the CrawlURI to delete.
513 *
514 * @throws DatabaseException
515 * @throws IllegalStateException if the URI was not on the list
516 */
517 protected void deleteInProcessing(String uri) throws DatabaseException, IOException {
518 DatabaseEntry keyEntry = new DatabaseEntry();
519
520 StringBinding.stringToEntry(uri, keyEntry);
521
522 OperationStatus opStatus = processingUriDB.delete(null, keyEntry);
523
524 if (opStatus != OperationStatus.SUCCESS) {
525 if (opStatus == OperationStatus.NOTFOUND) {
526 throw new IllegalStateException("Trying to deleta a "
527 + "non-existant URI from the list of URIs being "
528 + "processed. HQ: " + hostName + ", CrawlURI: " + uri);
529 }
530 throw new IOException("Error occured deleting URI: " + uri
531 + " from HQ " + hostName + " list "
532 + "of URIs currently being processed. "
533 + opStatus.toString());
534 }
535 }
536
537 /***
538 * Adds a CrawlURI to the list of CrawlURIs belonging to this HQ and are
539 * being processed at the moment.
540 *
541 * @param curi
542 * The CrawlURI to add to the list
543 * @throws DatabaseException
544 * @throws IllegalStateException
545 * if the CrawlURI is already in the list of URIs being
546 * processed.
547 */
548 protected void addInProcessing(CrawlURI curi) throws DatabaseException,
549 IllegalStateException {
550 DatabaseEntry keyEntry = new DatabaseEntry();
551 DatabaseEntry dataEntry = new DatabaseEntry();
552
553 StringBinding.stringToEntry(curi.toString(), keyEntry);
554 crawlURIBinding.objectToEntry(curi, dataEntry);
555
556 OperationStatus opStatus = processingUriDB.putNoOverwrite(null,
557 keyEntry, dataEntry);
558
559 if (opStatus != OperationStatus.SUCCESS) {
560 if (opStatus == OperationStatus.KEYEXIST) {
561 throw new IllegalStateException("Can not insert duplicate "
562 + "URI into list of URIs being processed. " + "HQ: "
563 + hostName + ", CrawlURI: " + curi.toString());
564 }
565 throw new IllegalStateException("Error occured adding CrawlURI: "
566 + curi.toString() + " to HQ " + hostName + " list "
567 + "of URIs currently being processed. "
568 + opStatus.toString());
569 }
570 }
571
572 /***
573 * Returns the CrawlURI associated with the specified URI (string) or null
574 * if no such CrawlURI is queued in this HQ. If CrawlURI is being processed
575 * it is not considered to be <i>queued </i> and this method will return
576 * null for any such URIs.
577 *
578 * @param uri
579 * A string representing the URI
580 * @return the CrawlURI associated with the specified URI (string) or null
581 * if no such CrawlURI is queued in this HQ.
582 *
583 * @throws DatabaseException
584 * if a errors occurs reading the database
585 */
586 protected CrawlURI getCrawlURI(String uri) throws DatabaseException{
587 DatabaseEntry keyEntry = new DatabaseEntry();
588 DatabaseEntry dataEntry = new DatabaseEntry();
589
590 primaryKeyBinding.objectToEntry(uri,keyEntry);
591 primaryUriDB.get(null,keyEntry,dataEntry,LockMode.DEFAULT);
592
593 CrawlURI curi = (CrawlURI)crawlURIBinding.entryToObject(dataEntry);
594
595 return curi;
596 }
597
598 /***
599 * Update CrawlURI that has completed processing.
600 *
601 * @param curi The CrawlURI. This must be a CrawlURI issued by this HQ's
602 * {@link #next() next()} method.
603 * @param needWait If true then the URI was processed successfully,
604 * requiring a period of suspended action on that host. If
605 * valence is > 1 then seperate times are maintained for
606 * each slot.
607 * @param wakeupTime If new state is
608 * {@link AdaptiveRevisitHostQueue#HQSTATE_SNOOZED snoozed}
609 * then this parameter should contain the time (in
610 * milliseconds) when it will be safe to wake the HQ up
611 * again. Otherwise this parameter will be ignored.
612 *
613 * @throws IllegalStateException if the CrawlURI
614 * does not match a CrawlURI issued for crawling by this HQ's
615 * {@link AdaptiveRevisitHostQueue#next() next()}.
616 * @throws IOException if an error occurs accessing the database
617 */
618 public void update(CrawlURI curi,
619 boolean needWait,
620 long wakeupTime)
621 throws IllegalStateException, IOException{
622 update(curi,needWait,wakeupTime,false);
623 }
624
625
626 /***
627 * Update CrawlURI that has completed processing.
628 *
629 * @param curi The CrawlURI. This must be a CrawlURI issued by this HQ's
630 * {@link #next() next()} method.
631 * @param needWait If true then the URI was processed successfully,
632 * requiring a period of suspended action on that host. If
633 * valence is > 1 then seperate times are maintained for
634 * each slot.
635 * @param wakeupTime If new state is
636 * {@link AdaptiveRevisitHostQueue#HQSTATE_SNOOZED snoozed}
637 * then this parameter should contain the time (in
638 * milliseconds) when it will be safe to wake the HQ up
639 * again. Otherwise this parameter will be ignored.
640 * @param forgetURI If true, the URI will be deleted from the queue.
641 *
642 * @throws IllegalStateException if the CrawlURI
643 * does not match a CrawlURI issued for crawling by this HQ's
644 * {@link AdaptiveRevisitHostQueue#next() next()}.
645 * @throws IOException if an error occurs accessing the database
646 */
647 public void update(CrawlURI curi,
648 boolean needWait,
649 long wakeupTime,
650 boolean forgetURI)
651 throws IllegalStateException, IOException{
652 if (logger.isLoggable(Level.FINE)) {
653 logger.fine("Updating " + curi.toString());
654 }
655 try{
656
657 if (forgetURI == false){
658 OperationStatus opStatus = strictAdd(curi,false);
659 if(opStatus != OperationStatus.SUCCESS){
660 if(opStatus == OperationStatus.KEYEXIST){
661 throw new IllegalStateException("Trying to update a" +
662 " CrawlURI failed because it was in the queue" +
663 " of URIs waiting for processing. URIs currently" +
664 " being processsed can never be in that queue." +
665 " HQ: " + hostName + ", CrawlURI: " +
666 curi.toString());
667 }
668 }
669
670
671 long curiTimeOfNextProcessing = curi.getLong(
672 A_TIME_OF_NEXT_PROCESSING);
673 if(nextReadyTime > curiTimeOfNextProcessing){
674 setNextReadyTime(curiTimeOfNextProcessing);
675 }
676
677 } else {
678 size--;
679 }
680
681
682 deleteInProcessing(curi.toString());
683
684 inProcessing--;
685
686
687 if(needWait==false){
688
689 wakeupTime = 0;
690 }
691
692 updateWakeUpTimeSlot(wakeupTime);
693 } catch (DatabaseException e) {
694
695 IOException e2 = new IOException(e.getMessage());
696 e2.setStackTrace(e.getStackTrace());
697 throw e2;
698 }
699 }
700
701 /***
702 * Returns the 'top' URI in the AdaptiveRevisitHostQueue.
703 * <p>
704 * HQ state will be set to {@link AdaptiveRevisitHostQueue#HQSTATE_BUSY busy} if this
705 * method returns normally.
706
707 *
708 * @return a CrawlURI ready for processing
709 *
710 * @throws IllegalStateException if the HostQueues current state is not
711 * ready {@link AdaptiveRevisitHostQueue#HQSTATE_READY ready}
712 * @throws IOException if an error occurs reading from the database
713 */
714 public CrawlURI next() throws IllegalStateException, IOException{
715 try{
716
717 if(getState()!=HQSTATE_READY || useWakeUpTimeSlot()==false){
718 throw new IllegalStateException("Can not issue next URI when " +
719 "HQ " + hostName + " state is " + getStateByName());
720 }
721
722 DatabaseEntry keyEntry = new DatabaseEntry();
723
724
725 CrawlURI curi = peek();
726
727
728 addInProcessing(curi);
729
730
731 primaryKeyBinding.objectToEntry(curi.toString(),keyEntry);
732 OperationStatus opStatus = primaryUriDB.delete(null,keyEntry);
733
734 if(opStatus != OperationStatus.SUCCESS){
735 throw new IOException("Error occured removing URI: " +
736 curi.toString() + " from HQ " + hostName +
737 " priority queue for processing. " + opStatus.toString());
738 }
739
740
741 CrawlURI top = peek();
742 long nextReady = Long.MAX_VALUE;
743 if(top != null){
744 nextReady = top.getLong(
745 A_TIME_OF_NEXT_PROCESSING);
746 }
747 inProcessing++;
748 setNextReadyTime(nextReady);
749 logger.fine("Issuing " + curi.toString());
750 return curi;
751 } catch (DatabaseException e) {
752
753 IOException e2 = new IOException(e.getMessage());
754 e2.setStackTrace(e.getStackTrace());
755 throw e2;
756 }
757 }
758
759 /***
760 * Returns the URI with the earliest time of next processing. I.e. the URI
761 * at the head of this host based priority queue.
762 * <p>
763 * Note: This method will return the head CrawlURI regardless of wether it
764 * is safe to start processing it or not. CrawlURI will remain in the queue.
765 * The returned CrawlURI should only be used for queue inspection, it can
766 * <i>not</i> be updated and returned to the queue. To get URIs ready for
767 * processing use {@link #next() next()}.
768 *
769 * @return the URI with the earliest time of next processing or null if
770 * the queue is empty or all URIs are currently being processed.
771 * @throws IllegalStateException
772 *
773 * @throws IOException if an error occurs reading from the database
774 */
775 public CrawlURI peek() throws IllegalStateException, IOException{
776 try{
777
778 DatabaseEntry keyEntry = new DatabaseEntry();
779 DatabaseEntry dataEntry = new DatabaseEntry();
780
781 CrawlURI curi = null;
782 Cursor secondaryCursor = secondaryUriDB.openCursor(null,null);
783
784 OperationStatus opStatus =
785 secondaryCursor.getFirst(keyEntry, dataEntry, LockMode.DEFAULT);
786
787 if( opStatus == OperationStatus.SUCCESS){
788 curi = (CrawlURI)crawlURIBinding.entryToObject(dataEntry);
789 } else {
790 if( opStatus == OperationStatus.NOTFOUND ){
791 curi = null;
792 } else {
793 throw new IOException("Error occured in " +
794 "AdaptiveRevisitHostQueue.peek()." + opStatus.toString());
795 }
796 }
797 secondaryCursor.close();
798 return curi;
799 } catch (DatabaseException e) {
800
801 IOException e2 = new IOException(e.getMessage());
802 e2.setStackTrace(e.getStackTrace());
803 throw e2;
804 }
805 }
806
807
808
809 /***
810 * Returns the current state of the HQ.
811 *
812 * @return the current state of the HQ.
813 *
814 * @see AdaptiveRevisitHostQueue#HQSTATE_BUSY
815 * @see AdaptiveRevisitHostQueue#HQSTATE_EMPTY
816 * @see AdaptiveRevisitHostQueue#HQSTATE_READY
817 * @see AdaptiveRevisitHostQueue#HQSTATE_SNOOZED
818 */
819 public int getState(){
820 if(state != HQSTATE_EMPTY){
821
822 if(isBusy()){
823 state = HQSTATE_BUSY;
824 } else {
825 long currentTime = System.currentTimeMillis();
826 long wakeTime = getEarliestWakeUpTimeSlot();
827
828 if(wakeTime > currentTime || nextReadyTime > currentTime){
829 state = HQSTATE_SNOOZED;
830 } else {
831 state = HQSTATE_READY;
832 }
833 }
834 }
835 return state;
836 }
837
838 /***
839 * Returns the time when the HQ will next be ready to issue a URI.
840 * <p>
841 * If the queue is in a {@link #HQSTATE_SNOOZED snoozed} state then this
842 * time will be in the future and reflects either the time when the HQ will
843 * again be able to issue URIs for processing because politness constraints
844 * have ended, or when a URI next becomes available for visit, whichever is
845 * larger.
846 * <p>
847 * If the queue is in a {@link #HQSTATE_READY ready} state this time will
848 * be in the past and reflect the earliest time when the HQ had a URI ready
849 * for processing, taking time spent snoozed for politness concerns into
850 * account.
851 * <p>
852 * If the HQ is in any other state then the return value of this method is
853 * equal to Long.MAX_VALUE.
854 * <p>
855 * This value may change each time a URI is added, issued or updated.
856 *
857 * @return the time when the HQ will next be ready to issue a URI
858 */
859 public long getNextReadyTime(){
860 if(getState()==HQSTATE_BUSY || getState()==HQSTATE_EMPTY){
861
862 return Long.MAX_VALUE;
863 }
864 long wakeTime = getEarliestWakeUpTimeSlot();
865 return nextReadyTime > wakeTime ? nextReadyTime : wakeTime;
866 }
867
868 /***
869 * Updates nextReadyTime (if smaller) with the supplied value
870 * @param newTime the new value of nextReady Time;
871 */
872 protected void setNextReadyTime(long newTime){
873 if(logger.isLoggable(Level.FINEST)){
874 logger.finest("Setting next ready to new value " + newTime +
875 " from " + getNextReadyTime());
876 }
877 nextReadyTime=newTime;
878 reorder();
879 }
880
881 /***
882 * Method is called whenever something has been done that might have
883 * changed the value of the 'published' time of next ready. If an owner
884 * has been specified it will be notified that the value may have changed..
885 */
886 protected void reorder(){
887 if(owner != null){
888 owner.reorder(this);
889 }
890 }
891
892
893 /***
894 * Same as {@link #getState() getState()} except this method returns a
895 * human readable name for the state instead of its constant integer value.
896 * <p>
897 * Should only be used for reports, error messages and other strings
898 * intended for human eyes.
899 *
900 * @return the human readable name of the current state
901 */
902 public String getStateByName() {
903 switch(getState()){
904 case HQSTATE_BUSY : return "busy";
905 case HQSTATE_EMPTY : return "empty";
906 case HQSTATE_READY : return "ready";
907 case HQSTATE_SNOOZED : return "snoozed";
908 }
909
910
911 return "undefined";
912 }
913
914 /***
915 * Returns the size of the HQ. That is, the number of URIs queued,
916 * including any that are currently being processed.
917 *
918 * @return the size of the HQ.
919 */
920 public long getSize(){
921 return size;
922 }
923
924 /***
925 * Set the AdaptiveRevisitQueueList object that contains this HQ. Will cause
926 * that
927 * object to be notified (via
928 * {@link AdaptiveRevisitQueueList#reorder(AdaptiveRevisitHostQueue)
929 * reorder()} when the
930 * value used for sorting the list of HQs changes.
931 * @param owner the ARHostQueueList object that contains this HQ.
932 */
933 public void setOwner(AdaptiveRevisitQueueList owner) {
934 this.owner = owner;
935 }
936
937 /***
938 * Cleanup all open Berkeley Database objects.
939 * <p>
940 * Does <I>not</I> close the Environment.
941 *
942 * @throws IOException if an error occurs closing a database object
943 */
944 public void close() throws IOException{
945 try{
946 secondaryUriDB.close();
947 processingUriDB.close();
948 primaryUriDB.close();
949 } catch (DatabaseException e) {
950
951 IOException e2 = new IOException(e.getMessage());
952 e2.setStackTrace(e.getStackTrace());
953 throw e2;
954 }
955 }
956
957
958 /***
959 * If true then the HQ has no available slot for issuing URIs.
960 * <p>
961 * I.e. number of in processing URIs = valence.
962 *
963 * @return true if number of in processing URIs = valence
964 */
965 private boolean isBusy(){
966 return inProcessing == valence;
967 }
968
969 /***
970 * Overwrites a used (-1) value in wakeUpTime[] with the supplied value.
971 * @param newVal
972 */
973 private void updateWakeUpTimeSlot(long newVal){
974 for(int i=0 ; i < valence ; i++){
975 if(wakeUpTime[i]==-1){
976 wakeUpTime[i]=newVal;
977 }
978 }
979 reorder();
980 }
981
982 /***
983 * A new URI is being issued. Set the wakeup time on an unused slot to -1.
984 *
985 * @return true if a slot was successfully reserved. False otherwise.
986 */
987 private boolean useWakeUpTimeSlot(){
988 for(int i=0 ; i < valence ; i++){
989 if(wakeUpTime[i]>-1 && wakeUpTime[i]<=System.currentTimeMillis()){
990 wakeUpTime[i]=-1;
991 return true;
992 }
993 }
994 reorder();
995 return false;
996 }
997
998 /***
999 * Returns the earliest time when a wake up slot will become available. If
1000 * one is already available then this time will be in the past.
1001 * <p>
1002 * If all slots are taken with URIs currently being processed (i.e. HQ state
1003 * is {@link #HQSTATE_BUSY busy} then this will return Long.MAX_VALUE;
1004 * @return the earliest time when a wake up slot will become available
1005 */
1006 private long getEarliestWakeUpTimeSlot(){
1007 long earliest = Long.MAX_VALUE;
1008 for(int i=0 ; i < valence ; i++){
1009 if(wakeUpTime[i]>-1 && wakeUpTime[i]<earliest){
1010 earliest = wakeUpTime[i];
1011 }
1012 }
1013 return earliest;
1014 }
1015
1016 /***
1017 * Returns a report detailing the status of this HQ.
1018 * @param max Maximum number of URIs to show. 0 equals no limit.
1019 * @return a report detailing the status of this HQ.
1020 */
1021 public String report(int max){
1022 try{
1023 StringBuffer ret = new StringBuffer(256);
1024 ret.append("AdaptiveRevisitHostQueue: " + hostName + "\n");
1025 ret.append("Size: " + size + "\n");
1026 ret.append("State: " + getStateByName() + "\n");
1027 if(getState()==HQSTATE_BUSY){
1028 ret.append("Processing URIs: \n");
1029 Cursor processingCursor = processingUriDB.openCursor(null,null);
1030 reportURIs(ret, processingCursor, valence);
1031 processingCursor.close();
1032 } else {
1033 ret.append("Next ready: " +
1034 ArchiveUtils.formatMillisecondsToConventional(
1035 getNextReadyTime() - System.currentTimeMillis()) +
1036 "\n");
1037 }
1038 ret.append("Top URIs: \n");
1039
1040 Cursor secondaryCursor = secondaryUriDB.openCursor(null,null);
1041 reportURIs(ret,secondaryCursor,max);
1042 secondaryCursor.close();
1043 return ret.toString();
1044 } catch( DatabaseException e ){
1045 return "Exception occured compiling report:\n" + e.getMessage();
1046 }
1047 }
1048
1049 /***
1050 * Adds a report of the first <code>max</code> URIs that the cursor points
1051 * to to the stringbuffer object.
1052 *
1053 * @param ret The stringbuffer to append to
1054 * @param cursor The cursor pointing at a URI database
1055 * @param max Maximum number of URIs to report on. If fewer URIs are in the
1056 * database, all URIs are shown
1057 * @throws DatabaseException if an error occurs
1058 */
1059 private void reportURIs(StringBuffer ret, Cursor cursor, int max)
1060 throws DatabaseException{
1061 DatabaseEntry keyEntry = new DatabaseEntry();
1062 DatabaseEntry dataEntry = new DatabaseEntry();
1063 OperationStatus opStatus =
1064 cursor.getFirst(keyEntry,dataEntry,LockMode.DEFAULT);
1065 if(max == 0){
1066
1067 max = Integer.MAX_VALUE;
1068 }
1069 int i = 0;
1070 while(i<max && opStatus == OperationStatus.SUCCESS){
1071 CrawlURI tmp = (CrawlURI)crawlURIBinding.entryToObject(dataEntry);
1072 ret.append(" URI: " + tmp.toString() + "\n");
1073 switch(tmp.getSchedulingDirective()){
1074 case CandidateURI.HIGHEST :
1075 ret.append(" Sched. directive: HIGHEST\n"); break;
1076 case CandidateURI.HIGH :
1077 ret.append(" Sched. directive: HIGH\n"); break;
1078 case CandidateURI.MEDIUM :
1079 ret.append(" Sched. directive: MEDIUM\n"); break;
1080 case CandidateURI.NORMAL :
1081 ret.append(" Sched. directive: NORMAL\n"); break;
1082 }
1083 ret.append(" Next processing: ");
1084 long nextProcessing =
1085 tmp.getLong(A_TIME_OF_NEXT_PROCESSING) -
1086 System.currentTimeMillis();
1087 if(nextProcessing < 0){
1088 ret.append("Overdue ");
1089 nextProcessing = nextProcessing*-1;
1090 }
1091 ret.append(ArchiveUtils.formatMillisecondsToConventional(
1092 nextProcessing) + "\n");
1093 if(tmp.getFetchStatus()!=0){
1094 ret.append(" Last fetch status: " +
1095 tmp.getFetchStatus() + "\n");
1096 }
1097 if(tmp.containsKey(A_WAIT_INTERVAL)){
1098 ret.append(" Wait interval: " +
1099 ArchiveUtils.formatMillisecondsToConventional(
1100 tmp.getLong(A_WAIT_INTERVAL)) + "\n");
1101 }
1102 if(tmp.containsKey(A_NUMBER_OF_VISITS)){
1103 ret.append(" Visits: " + tmp.getInt(
1104 A_NUMBER_OF_VISITS) + "\n");
1105 }
1106 if(tmp.containsKey(A_NUMBER_OF_VERSIONS)){
1107 ret.append(" Versions: " + tmp.getInt(
1108 A_NUMBER_OF_VERSIONS) + "\n");
1109 }
1110
1111 opStatus = cursor.getNext(keyEntry,dataEntry,LockMode.DEFAULT);
1112 i++;
1113 }
1114 }
1115
1116 /***
1117 * Creates the secondary key for the secondary index.
1118 * <p>
1119 * The secondary index is the scheduling directive (first sorting) and
1120 * the time of next processing (sorted from earlies to latest within each
1121 * scheduling directive). If the scheduling directive is missing or
1122 * unknown NORMAL will be assumed.
1123 */
1124 private static class OrderOfProcessingKeyCreator
1125 extends TupleSerialKeyCreator {
1126
1127 /***
1128 * Constructor. Invokes parent constructor.
1129 *
1130 * @param classCatalog is the catalog to hold shared class information
1131 * and for a database should be a
1132 * StoredClassCatalog.
1133 * @param dataClass is the CrawlURI class.
1134 */
1135 public OrderOfProcessingKeyCreator(ClassCatalog classCatalog,
1136 Class dataClass) {
1137 super(classCatalog, dataClass);
1138 }
1139
1140
1141
1142
1143 public boolean createSecondaryKey(TupleInput primaryKeyInput,
1144 Object dataInput,
1145 TupleOutput indexKeyOutput) {
1146 CrawlURI curi = (CrawlURI)dataInput;
1147 int directive = curi.getSchedulingDirective();
1148
1149
1150 switch (directive) {
1151 case CandidateURI.HIGHEST:
1152 directive = 0;
1153 break;
1154 case CandidateURI.HIGH:
1155 directive = 1;
1156 break;
1157 case CandidateURI.MEDIUM:
1158 directive = 2;
1159 break;
1160 case CandidateURI.NORMAL:
1161 directive = 3;
1162 break;
1163 default:
1164 directive = 3;
1165 }
1166
1167 indexKeyOutput.writeInt(directive);
1168 long timeOfNextProcessing =
1169 curi.getLong(A_TIME_OF_NEXT_PROCESSING);
1170
1171 indexKeyOutput.writeLong(timeOfNextProcessing);
1172 return true;
1173 }
1174 }
1175
1176
1177
1178
1179 public CrawlSubstats getSubstats() {
1180 return substats;
1181 }
1182
1183 }